]> git.ipfire.org Git - pbs.git/blame - src/buildservice/jobs.py
Refactor uploads
[pbs.git] / src / buildservice / jobs.py
CommitLineData
2a1e9ce2
MT
1#!/usr/bin/python
2
3import datetime
4import hashlib
5import logging
6import os
7import shutil
8import uuid
9
10import pakfire
11import pakfire.config
12
13log = logging.getLogger("builds")
14log.propagate = 1
15
16from . import arches
17from . import base
18from . import logs
19from . import users
20
21from .constants import *
22from .decorators import *
23
24class Jobs(base.Object):
25 def _get_job(self, query, *args):
26 res = self.db.get(query, *args)
27
28 if res:
29 return Job(self.backend, res.id, data=res)
30
31 def _get_jobs(self, query, *args):
32 res = self.db.query(query, *args)
33
34 for row in res:
35 yield Job(self.backend, row.id, data=row)
36
089dfc92 37 def create(self, build, arch, test=False, superseeds=None):
96cc81de
MT
38 job = self._get_job("INSERT INTO jobs(uuid, build_id, arch, test) \
39 VALUES(%s, %s, %s, %s) RETURNING *", "%s" % uuid.uuid4(), build.id, arch, test)
2a1e9ce2
MT
40 job.log("created")
41
42 # Set cache for Build object.
43 job.build = build
44
3f516e41
MT
45 # Mark if the new job superseeds some other job
46 if superseeds:
47 superseeds.superseeded_by = job
48
2a1e9ce2
MT
49 # Jobs are by default in state "new" and wait for being checked
50 # for dependencies. Packages that do have no build dependencies
51 # can directly be forwarded to "pending" state.
52 if not job.pkg.requires:
53 job.state = "pending"
54
55 return job
56
57 def get_by_id(self, id, data=None):
58 return Job(self.backend, id, data)
59
60 def get_by_uuid(self, uuid):
61 job = self.db.get("SELECT id FROM jobs WHERE uuid = %s", uuid)
62
63 if job:
64 return self.get_by_id(job.id)
65
2a1e9ce2
MT
66 def get_active(self, host_id=None, builder=None, states=None):
67 if builder:
68 host_id = builder.id
69
70 if states is None:
71 states = ["dispatching", "running", "uploading"]
72
73 query = "SELECT * FROM jobs WHERE state IN (%s)" % ", ".join(["%s"] * len(states))
74 args = states
75
76 if host_id:
77 query += " AND builder_id = %s" % host_id
78
79 query += " ORDER BY \
80 CASE \
81 WHEN jobs.state = 'running' THEN 0 \
82 WHEN jobs.state = 'uploading' THEN 1 \
83 WHEN jobs.state = 'dispatching' THEN 2 \
84 WHEN jobs.state = 'pending' THEN 3 \
85 WHEN jobs.state = 'new' THEN 4 \
86 END, time_started ASC"
87
88 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
89
90 def get_latest(self, arch=None, builder=None, limit=None, age=None, date=None):
91 query = "SELECT * FROM jobs"
92 args = []
93
94 where = ["(state = 'finished' OR state = 'failed' OR state = 'aborted')"]
95
96 if arch:
97 where.append("arch = %s")
98 args.append(arch)
99
100 if builder:
101 where.append("builder_id = %s")
102 args.append(builder.id)
103
104 if date:
105 try:
106 year, month, day = date.split("-", 2)
107 date = datetime.date(int(year), int(month), int(day))
108 except ValueError:
109 pass
110 else:
111 where.append("(time_created::date = %s OR \
112 time_started::date = %s OR time_finished::date = %s)")
113 args += (date, date, date)
114
115 if age:
116 where.append("time_finished >= NOW() - '%s'::interval" % age)
117
118 if where:
119 query += " WHERE %s" % " AND ".join(where)
120
121 query += " ORDER BY time_finished DESC"
122
123 if limit:
124 query += " LIMIT %s"
125 args.append(limit)
126
127 return [Job(self.backend, j.id, j) for j in self.db.query(query, *args)]
128
129 def get_average_build_time(self):
130 """
131 Returns the average build time of all finished builds from the
132 last 3 months.
133 """
134 result = self.db.get("SELECT AVG(time_finished - time_started) as average \
135 FROM jobs WHERE type = 'build' AND state = 'finished' AND \
136 time_finished >= NOW() - '3 months'::interval")
137
138 if result:
139 return result.average
140
141 def count(self, *states):
142 query = "SELECT COUNT(*) AS count FROM jobs"
143 args = []
144
145 if states:
146 query += " WHERE state IN %s"
147 args.append(states)
148
149 jobs = self.db.get(query, *args)
150 if jobs:
151 return jobs.count
152
6990cac2 153 def restart_failed(self):
2a1e9ce2
MT
154 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
155 JOIN builds ON builds.id = jobs.build_id \
156 WHERE \
157 jobs.type = 'build' AND \
158 jobs.state = 'failed' AND \
2a1e9ce2
MT
159 NOT builds.state = 'broken' AND \
160 jobs.time_finished < NOW() - '72 hours'::interval \
161 ORDER BY \
162 CASE \
163 WHEN jobs.type = 'build' THEN 0 \
164 WHEN jobs.type = 'test' THEN 1 \
165 END, \
6990cac2 166 builds.priority DESC, jobs.time_created ASC")
2a1e9ce2
MT
167
168 # Restart the job
169 for job in jobs:
96cc81de 170 job.restart()
2a1e9ce2
MT
171
172
173class Job(base.DataObject):
174 table = "jobs"
175
176 def __str__(self):
177 return "<%s id=%s %s>" % (self.__class__.__name__, self.id, self.name)
178
179 def __eq__(self, other):
180 if isinstance(other, self.__class__):
181 return self.id == other.id
182
183 def __lt__(self, other):
184 if isinstance(other, self.__class__):
4f90cf84 185 if not self.test and other.test:
2a1e9ce2
MT
186 return True
187
188 if self.build == other.build:
189 return arches.priority(self.arch) < arches.priority(other.arch)
190
191 return self.time_created < other.time_created
192
193 def __iter__(self):
194 packages = self.backend.packages._get_packages("SELECT packages.* FROM jobs_packages \
195 LEFT JOIN packages ON jobs_packages.pkg_id = packages.id \
196 WHERE jobs_packages.job_id = %s ORDER BY packages.name", self.id)
197
198 return iter(packages)
199
200 def __nonzero__(self):
201 return True
202
203 def __len__(self):
204 res = self.db.get("SELECT COUNT(*) AS len FROM jobs_packages \
205 WHERE job_id = %s", self.id)
206
207 return res.len
208
209 @property
210 def distro(self):
211 return self.build.distro
212
96cc81de
MT
213 def restart(self):
214 # Copy the job and let it build again
215 return self.backend.jobs.create(self.build, self.arch,
216 test=self.test, superseeds=self)
217
3f516e41
MT
218 def get_superseeded_by(self):
219 if self.data.superseeded_by:
220 return self.backend.jobs.get_by_id(self.data.superseeded_by)
221
222 def set_superseeded_by(self, superseeded_by):
223 assert isinstance(superseeded_by, self.__class__)
224
225 self._set_attribute("superseeded_by", superseeded_by.id)
226 self.superseeded_by = superseeded_by
227
228 superseeded_by = lazy_property(get_superseeded_by, set_superseeded_by)
229
2a1e9ce2 230 def delete(self):
40a82ce5
MT
231 self._set_attribute("delete", True)
232
233 def remove(self):
234 """
235 Removes a job from the database
236 """
237 self.__remove_buildroots()
238 self.__remove_history()
239 self.__remove_packages()
240 self.__remove_logfiles()
2a1e9ce2
MT
241
242 # Delete the job itself.
243 self.db.execute("DELETE FROM jobs WHERE id = %s", self.id)
244
40a82ce5 245 def __remove_buildroots(self):
2a1e9ce2
MT
246 """
247 Removes all buildroots.
248 """
249 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
250
40a82ce5 251 def __remove_history(self):
2a1e9ce2
MT
252 """
253 Removes all references in the history to this build job.
254 """
255 self.db.execute("DELETE FROM jobs_history WHERE job_id = %s", self.id)
256
40a82ce5 257 def __remove_packages(self):
2a1e9ce2
MT
258 """
259 Deletes all uploaded files from the job.
260 """
261 for pkg in self.packages:
262 pkg.delete()
263
264 self.db.execute("DELETE FROM jobs_packages WHERE job_id = %s", self.id)
265
40a82ce5 266 def __remove_logfiles(self):
2a1e9ce2
MT
267 for logfile in self.logfiles:
268 self.db.execute("INSERT INTO queue_delete(path) VALUES(%s)", logfile.path)
269
2a1e9ce2
MT
270 ## Logging stuff
271
272 def log(self, action, user=None, state=None, builder=None, test_job=None):
273 user_id = None
274 if user:
275 user_id = user.id
276
277 builder_id = None
278 if builder:
279 builder_id = builder.id
280
281 test_job_id = None
282 if test_job:
283 test_job_id = test_job.id
284
285 self.db.execute("INSERT INTO jobs_history(job_id, action, state, user_id, \
286 time, builder_id, test_job_id) VALUES(%s, %s, %s, %s, NOW(), %s, %s)",
287 self.id, action, state, user_id, builder_id, test_job_id)
288
289 def get_log(self, limit=None, offset=None, user=None):
290 query = "SELECT * FROM jobs_history"
291
292 conditions = ["job_id = %s",]
293 args = [self.id,]
294
295 if user:
296 conditions.append("user_id = %s")
297 args.append(user.id)
298
299 if conditions:
300 query += " WHERE %s" % " AND ".join(conditions)
301
302 query += " ORDER BY time DESC"
303
304 if limit:
305 if offset:
306 query += " LIMIT %s,%s"
307 args += [offset, limit,]
308 else:
309 query += " LIMIT %s"
310 args += [limit,]
311
312 entries = []
313 for entry in self.db.query(query, *args):
314 entry = logs.JobLogEntry(self.backend, entry)
315 entries.append(entry)
316
317 return entries
318
319 @property
320 def uuid(self):
321 return self.data.uuid
322
323 @property
4f90cf84
MT
324 def test(self):
325 return self.data.test
2a1e9ce2
MT
326
327 @property
328 def build_id(self):
329 return self.data.build_id
330
331 @lazy_property
332 def build(self):
333 return self.backend.builds.get_by_id(self.build_id)
334
335 @property
336 def related_jobs(self):
337 ret = []
338
339 for job in self.build.jobs:
340 if job == self:
341 continue
342
343 ret.append(job)
344
345 return ret
346
347 @property
348 def pkg(self):
349 return self.build.pkg
350
351 @property
352 def name(self):
353 return "%s-%s.%s" % (self.pkg.name, self.pkg.friendly_version, self.arch)
354
355 @property
356 def size(self):
357 return sum((p.size for p in self.packages))
358
359 @lazy_property
360 def rank(self):
361 """
362 Returns the rank in the build queue
363 """
364 if not self.state == "pending":
365 return
366
367 res = self.db.get("SELECT rank FROM jobs_queue WHERE job_id = %s", self.id)
368
369 if res:
370 return res.rank
371
372 def is_running(self):
373 """
374 Returns True if job is in a running state.
375 """
376 return self.state in ("pending", "dispatching", "running", "uploading")
377
378 def get_state(self):
379 return self.data.state
380
381 def set_state(self, state, user=None, log=True):
382 # Nothing to do if the state remains.
383 if not self.state == state:
384 self.db.execute("UPDATE jobs SET state = %s WHERE id = %s", state, self.id)
385
386 # Log the event.
387 if log and not state == "new":
388 self.log("state_change", state=state, user=user)
389
390 # Update cache.
391 if self._data:
392 self._data["state"] = state
393
394 # Always clear the message when the status is changed.
395 self.update_message(None)
396
397 # Update some more informations.
398 if state == "dispatching":
399 # Set start time.
400 self.db.execute("UPDATE jobs SET time_started = NOW(), time_finished = NULL \
401 WHERE id = %s", self.id)
402
403 elif state == "pending":
6990cac2 404 self.db.execute("UPDATE jobs SET time_started = NULL, \
2a1e9ce2
MT
405 time_finished = NULL WHERE id = %s", self.id)
406
407 elif state in ("aborted", "dependency_error", "finished", "failed"):
408 # Set finish time and reset builder..
409 self.db.execute("UPDATE jobs SET time_finished = NOW() WHERE id = %s", self.id)
410
411 # Send messages to the user.
412 if state == "finished":
413 self.send_finished_message()
414
415 elif state == "failed":
416 # Remove all package files if a job is set to failed state.
417 self.__delete_packages()
418
419 self.send_failed_message()
420
421 # Automatically update the state of the build (not on test builds).
4f90cf84 422 if not self.test:
2a1e9ce2
MT
423 self.build.auto_update_state()
424
425 state = property(get_state, set_state)
426
427 @property
428 def message(self):
429 return self.data.message
430
431 def update_message(self, msg):
432 self.db.execute("UPDATE jobs SET message = %s WHERE id = %s",
433 msg, self.id)
434
435 if self._data:
436 self._data["message"] = msg
437
438 def get_builder(self):
439 if self.data.builder_id:
440 return self.backend.builders.get_by_id(self.data.builder_id)
441
442 def set_builder(self, builder, user=None):
443 self.db.execute("UPDATE jobs SET builder_id = %s WHERE id = %s",
444 builder.id, self.id)
445
446 # Update cache.
447 if self._data:
448 self._data["builder_id"] = builder.id
449
450 self._builder = builder
451
452 # Log the event.
453 if user:
454 self.log("builder_assigned", builder=builder, user=user)
455
456 builder = lazy_property(get_builder, set_builder)
457
458 @property
459 def arch(self):
460 return self.data.arch
461
462 @property
463 def duration(self):
464 if not self.time_started:
465 return 0
466
467 if self.time_finished:
468 delta = self.time_finished - self.time_started
469 else:
470 delta = datetime.datetime.utcnow() - self.time_started
471
472 return delta.total_seconds()
473
474 @property
475 def time_created(self):
476 return self.data.time_created
477
478 @property
479 def time_started(self):
480 return self.data.time_started
481
482 @property
483 def time_finished(self):
484 return self.data.time_finished
485
486 @property
487 def expected_runtime(self):
488 """
489 Returns the estimated time and stddev, this job takes to finish.
490 """
491 # Get the average build time.
492 build_times = self.backend.builds.get_build_times_by_arch(self.arch,
493 name=self.pkg.name)
494
495 # If there is no statistical data, we cannot estimate anything.
496 if not build_times:
497 return None, None
498
499 return build_times.average, build_times.stddev
500
501 @property
502 def eta(self):
503 expected_runtime, stddev = self.expected_runtime
504
505 if expected_runtime:
506 return expected_runtime - int(self.duration), stddev
507
2a1e9ce2
MT
508 def get_pkg_by_uuid(self, uuid):
509 pkg = self.backend.packages._get_package("SELECT packages.id FROM packages \
510 JOIN jobs_packages ON jobs_packages.pkg_id = packages.id \
511 WHERE jobs_packages.job_id = %s AND packages.uuid = %s",
512 self.id, uuid)
513
514 if pkg:
515 pkg.job = self
516 return pkg
517
518 @lazy_property
519 def logfiles(self):
520 logfiles = []
521
522 for log in self.db.query("SELECT id FROM logfiles WHERE job_id = %s", self.id):
523 log = logs.LogFile(self.backend, log.id)
524 log._job = self
525
526 logfiles.append(log)
527
528 return logfiles
529
530 def add_file(self, filename):
531 """
532 Add the specified file to this job.
533
534 The file is copied to the right directory by this function.
535 """
536 assert os.path.exists(filename)
537
538 if filename.endswith(".log"):
539 self._add_file_log(filename)
540
541 elif filename.endswith(".%s" % PACKAGE_EXTENSION):
542 # It is not allowed to upload packages on test builds.
4f90cf84 543 if self.test:
2a1e9ce2
MT
544 return
545
546 self._add_file_package(filename)
547
548 def _add_file_log(self, filename):
549 """
550 Attach a log file to this job.
551 """
552 target_dirname = os.path.join(self.build.path, "logs")
553
4f90cf84 554 if self.test:
2a1e9ce2
MT
555 i = 1
556 while True:
557 target_filename = os.path.join(target_dirname,
6990cac2 558 "test.%s.%s.%s.log" % (self.arch, i, self.uuid))
2a1e9ce2
MT
559
560 if os.path.exists(target_filename):
561 i += 1
562 else:
563 break
564 else:
565 target_filename = os.path.join(target_dirname,
6990cac2 566 "build.%s.%s.log" % (self.arch, self.uuid))
2a1e9ce2
MT
567
568 # Make sure the target directory exists.
569 if not os.path.exists(target_dirname):
570 os.makedirs(target_dirname)
571
572 # Calculate a SHA512 hash from that file.
573 f = open(filename, "rb")
574 h = hashlib.sha512()
575 while True:
576 buf = f.read(BUFFER_SIZE)
577 if not buf:
578 break
579
580 h.update(buf)
581 f.close()
582
583 # Copy the file to the final location.
584 shutil.copy2(filename, target_filename)
585
586 # Create an entry in the database.
587 self.db.execute("INSERT INTO logfiles(job_id, path, filesize, hash_sha512) \
588 VALUES(%s, %s, %s, %s)", self.id, os.path.relpath(target_filename, PACKAGES_DIR),
589 os.path.getsize(target_filename), h.hexdigest())
590
591 def _add_file_package(self, filename):
592 # Open package (creates entry in the database)
593 pkg = self.backend.packages.create(filename)
594
595 # Move package to the build directory.
596 pkg.move(os.path.join(self.build.path, self.arch))
597
598 # Attach the package to this job.
599 self.db.execute("INSERT INTO jobs_packages(job_id, pkg_id) VALUES(%s, %s)",
600 self.id, pkg.id)
601
602 def get_aborted_state(self):
603 return self.data.aborted_state
604
605 def set_aborted_state(self, state):
606 self._set_attribute("aborted_state", state)
607
608 aborted_state = property(get_aborted_state, set_aborted_state)
609
610 @property
611 def message_recipients(self):
612 l = []
613
614 # Add all people watching the build.
615 l += self.build.message_recipients
616
617 # Add the package maintainer on release builds.
618 if self.build.type == "release":
619 maint = self.pkg.maintainer
620
621 if isinstance(maint, users.User):
622 l.append("%s <%s>" % (maint.realname, maint.email))
623 elif maint:
624 l.append(maint)
625
626 # XXX add committer and commit author.
627
628 # Add the owner of the scratch build on scratch builds.
629 elif self.build.type == "scratch" and self.build.user:
630 l.append("%s <%s>" % \
631 (self.build.user.realname, self.build.user.email))
632
633 return set(l)
634
635 def save_buildroot(self, pkgs):
6990cac2
MT
636 # Cleanup old stuff first (for rebuilding packages)
637 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
2a1e9ce2
MT
638
639 for pkg_name, pkg_uuid in pkgs:
6990cac2
MT
640 self.db.execute("INSERT INTO jobs_buildroots(job_id, pkg_uuid, pkg_name) \
641 VALUES(%s, %s, %s)", self.id, pkg_name, pkg_uuid)
2a1e9ce2 642
6990cac2
MT
643 @lazy_property
644 def buildroot(self):
2a1e9ce2 645 rows = self.db.query("SELECT * FROM jobs_buildroots \
6990cac2 646 WHERE jobs_buildroots.job_id = %s ORDER BY pkg_name", self.id)
2a1e9ce2
MT
647
648 pkgs = []
649 for row in rows:
650 # Search for this package in the packages table.
651 pkg = self.backend.packages.get_by_uuid(row.pkg_uuid)
652 pkgs.append((row.pkg_name, row.pkg_uuid, pkg))
653
654 return pkgs
655
656 def send_finished_message(self):
657 # Send no finished mails for test jobs.
4f90cf84 658 if self.test:
2a1e9ce2
MT
659 return
660
661 logging.debug("Sending finished message for job %s to %s" % \
662 (self.name, ", ".join(self.message_recipients)))
663
664 info = {
665 "build_name" : self.name,
666 "build_host" : self.builder.name,
667 "build_uuid" : self.uuid,
668 }
669
670 self.backend.messages.send_to_all(self.message_recipients,
671 MSG_BUILD_FINISHED_SUBJECT, MSG_BUILD_FINISHED, info)
672
673 def send_failed_message(self):
674 logging.debug("Sending failed message for job %s to %s" % \
675 (self.name, ", ".join(self.message_recipients)))
676
677 build_host = "--"
678 if self.builder:
679 build_host = self.builder.name
680
681 info = {
682 "build_name" : self.name,
683 "build_host" : build_host,
684 "build_uuid" : self.uuid,
685 }
686
687 self.backend.messages.send_to_all(self.message_recipients,
688 MSG_BUILD_FAILED_SUBJECT, MSG_BUILD_FAILED, info)
689
690 def set_start_time(self, start_not_before):
691 self._set_attribute("start_not_before", start_not_before)
692
693 def schedule(self, type, start_time=None, user=None):
694 assert type in ("rebuild", "test")
695
696 if type == "rebuild":
697 if self.state == "finished":
698 return
699
96cc81de
MT
700 job = self.restart()
701 job.set_start_time(start_time)
2a1e9ce2
MT
702
703 # Log the event.
704 self.log("schedule_rebuild", user=user)
705
706 elif type == "test":
707 if not self.state == "finished":
708 return
709
710 # Create a new job with same build and arch.
96cc81de 711 job = self.create(self.backend, self.build, self.arch, test=True)
2a1e9ce2
MT
712 job.set_start_time(start_time)
713
714 # Log the event.
715 self.log("schedule_test_job", test_job=job, user=user)
716
717 return job
718
719 def schedule_test(self, start_not_before=None, user=None):
720 # XXX to be removed
721 return self.schedule("test", start_time=start_not_before, user=user)
722
723 def schedule_rebuild(self, start_not_before=None, user=None):
724 # XXX to be removed
725 return self.schedule("rebuild", start_time=start_not_before, user=user)
726
727 def get_build_repos(self):
728 """
729 Returns a list of all repositories that should be used when
730 building this job.
731 """
732 repo_ids = self.db.query("SELECT repo_id FROM jobs_repos WHERE job_id = %s",
733 self.id)
734
735 if not repo_ids:
736 return self.distro.get_build_repos()
737
738 repos = []
739 for repo in self.distro.repositories:
740 if repo.id in [r.id for r in repo_ids]:
741 repos.append(repo)
742
743 return repos or self.distro.get_build_repos()
744
745 def get_repo_config(self):
746 """
747 Get repository configuration file that is sent to the builder.
748 """
749 confs = []
750
751 for repo in self.get_build_repos():
752 confs.append(repo.get_conf())
753
754 return "\n\n".join(confs)
755
756 def get_config(self):
757 """
758 Get configuration file that is sent to the builder.
759 """
760 confs = []
761
762 # Add the distribution configuration.
763 confs.append(self.distro.get_config())
764
765 # Then add all repositories for this build.
766 confs.append(self.get_repo_config())
767
768 return "\n\n".join(confs)
769
770 def resolvdep(self):
771 config = pakfire.config.Config(files=["general.conf"])
772 config.parse(self.get_config())
773
774 # The filename of the source file.
775 filename = os.path.join(PACKAGES_DIR, self.build.pkg.path)
776 assert os.path.exists(filename), filename
777
778 # Create a new pakfire instance with the configuration for
779 # this build.
780 p = pakfire.PakfireServer(config=config, arch=self.arch)
781
782 # Try to solve the build dependencies.
783 try:
784 solver = p.resolvdep(filename)
785
786 # Catch dependency errors and log the problem string.
787 except DependencyError, e:
788 self.state = "dependency_error"
789 self.update_message(e)
790
791 else:
792 # If the build dependencies can be resolved, we set the build in
793 # pending state.
794 if solver.status is True:
795 if self.state in ("failed",):
796 return
797
798 self.state = "pending"