13 log
= logging
.getLogger("builds")
21 from .constants
import *
22 from .decorators
import *
24 class Jobs(base
.Object
):
25 def _get_job(self
, query
, *args
):
26 res
= self
.db
.get(query
, *args
)
29 return Job(self
.backend
, res
.id, data
=res
)
31 def _get_jobs(self
, query
, *args
):
32 res
= self
.db
.query(query
, *args
)
35 yield Job(self
.backend
, row
.id, data
=row
)
37 def create(self
, build
, arch
, type="build", superseeds
=None):
38 job
= self
._get
_job
("INSERT INTO jobs(uuid, type, build_id, arch, time_created) \
39 VALUES(%s, %s, %s, %s, NOW()) RETURNING *", "%s" % uuid
.uuid4(), type, build
.id, arch
)
42 # Set cache for Build object.
45 # Mark if the new job superseeds some other job
47 superseeds
.superseeded_by
= job
49 # Jobs are by default in state "new" and wait for being checked
50 # for dependencies. Packages that do have no build dependencies
51 # can directly be forwarded to "pending" state.
52 if not job
.pkg
.requires
:
57 def get_by_id(self
, id, data
=None):
58 return Job(self
.backend
, id, data
)
60 def get_by_uuid(self
, uuid
):
61 job
= self
.db
.get("SELECT id FROM jobs WHERE uuid = %s", uuid
)
64 return self
.get_by_id(job
.id)
66 def get_by_build(self
, build_id
, build
=None, type=None):
68 Get all jobs in the specifies build.
70 query
= "SELECT * FROM jobs WHERE build_id = %s"
74 query
+= " AND type = %s"
77 # Get IDs of all builds in this group.
79 for job
in self
.db
.query(query
, *args
):
80 job
= Job(self
.backend
, job
.id, job
)
82 # If the Build object was set, we set it so it won't be retrieved
83 # from the database again.
89 # Return sorted list of jobs.
92 def get_active(self
, host_id
=None, builder
=None, states
=None):
97 states
= ["dispatching", "running", "uploading"]
99 query
= "SELECT * FROM jobs WHERE state IN (%s)" % ", ".join(["%s"] * len(states
))
103 query
+= " AND builder_id = %s" % host_id
105 query
+= " ORDER BY \
107 WHEN jobs.state = 'running' THEN 0 \
108 WHEN jobs.state = 'uploading' THEN 1 \
109 WHEN jobs.state = 'dispatching' THEN 2 \
110 WHEN jobs.state = 'pending' THEN 3 \
111 WHEN jobs.state = 'new' THEN 4 \
112 END, time_started ASC"
114 return [Job(self
.backend
, j
.id, j
) for j
in self
.db
.query(query
, *args
)]
116 def get_latest(self
, arch
=None, builder
=None, limit
=None, age
=None, date
=None):
117 query
= "SELECT * FROM jobs"
120 where
= ["(state = 'finished' OR state = 'failed' OR state = 'aborted')"]
123 where
.append("arch = %s")
127 where
.append("builder_id = %s")
128 args
.append(builder
.id)
132 year
, month
, day
= date
.split("-", 2)
133 date
= datetime
.date(int(year
), int(month
), int(day
))
137 where
.append("(time_created::date = %s OR \
138 time_started::date = %s OR time_finished::date = %s)")
139 args
+= (date
, date
, date
)
142 where
.append("time_finished >= NOW() - '%s'::interval" % age
)
145 query
+= " WHERE %s" % " AND ".join(where
)
147 query
+= " ORDER BY time_finished DESC"
153 return [Job(self
.backend
, j
.id, j
) for j
in self
.db
.query(query
, *args
)]
155 def get_average_build_time(self
):
157 Returns the average build time of all finished builds from the
160 result
= self
.db
.get("SELECT AVG(time_finished - time_started) as average \
161 FROM jobs WHERE type = 'build' AND state = 'finished' AND \
162 time_finished >= NOW() - '3 months'::interval")
165 return result
.average
167 def count(self
, *states
):
168 query
= "SELECT COUNT(*) AS count FROM jobs"
172 query
+= " WHERE state IN %s"
175 jobs
= self
.db
.get(query
, *args
)
179 def restart_failed(self
, max_tries
=9):
180 jobs
= self
._get
_jobs
("SELECT jobs.* FROM jobs \
181 JOIN builds ON builds.id = jobs.build_id \
183 jobs.type = 'build' AND \
184 jobs.state = 'failed' AND \
185 jobs.tries <= %s AND \
186 NOT builds.state = 'broken' AND \
187 jobs.time_finished < NOW() - '72 hours'::interval \
190 WHEN jobs.type = 'build' THEN 0 \
191 WHEN jobs.type = 'test' THEN 1 \
193 builds.priority DESC, jobs.time_created ASC",
198 job
.set_state("new", log
=False)
201 class Job(base
.DataObject
):
205 return "<%s id=%s %s>" % (self
.__class
__.__name
__, self
.id, self
.name
)
207 def __eq__(self
, other
):
208 if isinstance(other
, self
.__class
__):
209 return self
.id == other
.id
211 def __lt__(self
, other
):
212 if isinstance(other
, self
.__class
__):
213 if not self
.test
and other
.test
:
216 if self
.build
== other
.build
:
217 return arches
.priority(self
.arch
) < arches
.priority(other
.arch
)
219 return self
.time_created
< other
.time_created
222 packages
= self
.backend
.packages
._get
_packages
("SELECT packages.* FROM jobs_packages \
223 LEFT JOIN packages ON jobs_packages.pkg_id = packages.id \
224 WHERE jobs_packages.job_id = %s ORDER BY packages.name", self
.id)
226 return iter(packages
)
228 def __nonzero__(self
):
232 res
= self
.db
.get("SELECT COUNT(*) AS len FROM jobs_packages \
233 WHERE job_id = %s", self
.id)
239 return self
.build
.distro
241 def get_superseeded_by(self
):
242 if self
.data
.superseeded_by
:
243 return self
.backend
.jobs
.get_by_id(self
.data
.superseeded_by
)
245 def set_superseeded_by(self
, superseeded_by
):
246 assert isinstance(superseeded_by
, self
.__class
__)
248 self
._set
_attribute
("superseeded_by", superseeded_by
.id)
249 self
.superseeded_by
= superseeded_by
251 superseeded_by
= lazy_property(get_superseeded_by
, set_superseeded_by
)
254 self
.__delete
_buildroots
()
255 self
.__delete
_history
()
256 self
.__delete
_packages
()
257 self
.__delete
_logfiles
()
259 # Delete the job itself.
260 self
.db
.execute("DELETE FROM jobs WHERE id = %s", self
.id)
262 def __delete_buildroots(self
):
264 Removes all buildroots.
266 self
.db
.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self
.id)
268 def __delete_history(self
):
270 Removes all references in the history to this build job.
272 self
.db
.execute("DELETE FROM jobs_history WHERE job_id = %s", self
.id)
274 def __delete_packages(self
):
276 Deletes all uploaded files from the job.
278 for pkg
in self
.packages
:
281 self
.db
.execute("DELETE FROM jobs_packages WHERE job_id = %s", self
.id)
283 def __delete_logfiles(self
):
284 for logfile
in self
.logfiles
:
285 self
.db
.execute("INSERT INTO queue_delete(path) VALUES(%s)", logfile
.path
)
287 def reset(self
, user
=None):
288 self
.__delete
_buildroots
()
289 self
.__delete
_packages
()
290 self
.__delete
_history
()
291 self
.__delete
_logfiles
()
294 self
.log("reset", user
=user
)
298 def log(self
, action
, user
=None, state
=None, builder
=None, test_job
=None):
305 builder_id
= builder
.id
309 test_job_id
= test_job
.id
311 self
.db
.execute("INSERT INTO jobs_history(job_id, action, state, user_id, \
312 time, builder_id, test_job_id) VALUES(%s, %s, %s, %s, NOW(), %s, %s)",
313 self
.id, action
, state
, user_id
, builder_id
, test_job_id
)
315 def get_log(self
, limit
=None, offset
=None, user
=None):
316 query
= "SELECT * FROM jobs_history"
318 conditions
= ["job_id = %s",]
322 conditions
.append("user_id = %s")
326 query
+= " WHERE %s" % " AND ".join(conditions
)
328 query
+= " ORDER BY time DESC"
332 query
+= " LIMIT %s,%s"
333 args
+= [offset
, limit
,]
339 for entry
in self
.db
.query(query
, *args
):
340 entry
= logs
.JobLogEntry(self
.backend
, entry
)
341 entries
.append(entry
)
347 return self
.data
.uuid
351 return self
.data
.test
355 return self
.data
.build_id
359 return self
.backend
.builds
.get_by_id(self
.build_id
)
362 def related_jobs(self
):
365 for job
in self
.build
.jobs
:
375 return self
.build
.pkg
379 return "%s-%s.%s" % (self
.pkg
.name
, self
.pkg
.friendly_version
, self
.arch
)
383 return sum((p
.size
for p
in self
.packages
))
388 Returns the rank in the build queue
390 if not self
.state
== "pending":
393 res
= self
.db
.get("SELECT rank FROM jobs_queue WHERE job_id = %s", self
.id)
398 def is_running(self
):
400 Returns True if job is in a running state.
402 return self
.state
in ("pending", "dispatching", "running", "uploading")
405 return self
.data
.state
407 def set_state(self
, state
, user
=None, log
=True):
408 # Nothing to do if the state remains.
409 if not self
.state
== state
:
410 self
.db
.execute("UPDATE jobs SET state = %s WHERE id = %s", state
, self
.id)
413 if log
and not state
== "new":
414 self
.log("state_change", state
=state
, user
=user
)
418 self
._data
["state"] = state
420 # Always clear the message when the status is changed.
421 self
.update_message(None)
423 # Update some more informations.
424 if state
== "dispatching":
426 self
.db
.execute("UPDATE jobs SET time_started = NOW(), time_finished = NULL \
427 WHERE id = %s", self
.id)
429 elif state
== "pending":
430 self
.db
.execute("UPDATE jobs SET tries = tries + 1, time_started = NULL, \
431 time_finished = NULL WHERE id = %s", self
.id)
433 elif state
in ("aborted", "dependency_error", "finished", "failed"):
434 # Set finish time and reset builder..
435 self
.db
.execute("UPDATE jobs SET time_finished = NOW() WHERE id = %s", self
.id)
437 # Send messages to the user.
438 if state
== "finished":
439 self
.send_finished_message()
441 elif state
== "failed":
442 # Remove all package files if a job is set to failed state.
443 self
.__delete
_packages
()
445 self
.send_failed_message()
447 # Automatically update the state of the build (not on test builds).
449 self
.build
.auto_update_state()
451 state
= property(get_state
, set_state
)
455 return self
.data
.message
457 def update_message(self
, msg
):
458 self
.db
.execute("UPDATE jobs SET message = %s WHERE id = %s",
462 self
._data
["message"] = msg
464 def get_builder(self
):
465 if self
.data
.builder_id
:
466 return self
.backend
.builders
.get_by_id(self
.data
.builder_id
)
468 def set_builder(self
, builder
, user
=None):
469 self
.db
.execute("UPDATE jobs SET builder_id = %s WHERE id = %s",
474 self
._data
["builder_id"] = builder
.id
476 self
._builder
= builder
480 self
.log("builder_assigned", builder
=builder
, user
=user
)
482 builder
= lazy_property(get_builder
, set_builder
)
486 return self
.data
.arch
490 if not self
.time_started
:
493 if self
.time_finished
:
494 delta
= self
.time_finished
- self
.time_started
496 delta
= datetime
.datetime
.utcnow() - self
.time_started
498 return delta
.total_seconds()
501 def time_created(self
):
502 return self
.data
.time_created
505 def time_started(self
):
506 return self
.data
.time_started
509 def time_finished(self
):
510 return self
.data
.time_finished
513 def expected_runtime(self
):
515 Returns the estimated time and stddev, this job takes to finish.
517 # Get the average build time.
518 build_times
= self
.backend
.builds
.get_build_times_by_arch(self
.arch
,
521 # If there is no statistical data, we cannot estimate anything.
525 return build_times
.average
, build_times
.stddev
529 expected_runtime
, stddev
= self
.expected_runtime
532 return expected_runtime
- int(self
.duration
), stddev
536 return self
.data
.tries
538 def get_pkg_by_uuid(self
, uuid
):
539 pkg
= self
.backend
.packages
._get
_package
("SELECT packages.id FROM packages \
540 JOIN jobs_packages ON jobs_packages.pkg_id = packages.id \
541 WHERE jobs_packages.job_id = %s AND packages.uuid = %s",
552 for log
in self
.db
.query("SELECT id FROM logfiles WHERE job_id = %s", self
.id):
553 log
= logs
.LogFile(self
.backend
, log
.id)
560 def add_file(self
, filename
):
562 Add the specified file to this job.
564 The file is copied to the right directory by this function.
566 assert os
.path
.exists(filename
)
568 if filename
.endswith(".log"):
569 self
._add
_file
_log
(filename
)
571 elif filename
.endswith(".%s" % PACKAGE_EXTENSION
):
572 # It is not allowed to upload packages on test builds.
576 self
._add
_file
_package
(filename
)
578 def _add_file_log(self
, filename
):
580 Attach a log file to this job.
582 target_dirname
= os
.path
.join(self
.build
.path
, "logs")
587 target_filename
= os
.path
.join(target_dirname
,
588 "test.%s.%s.%s.log" % (self
.arch
, i
, self
.tries
))
590 if os
.path
.exists(target_filename
):
595 target_filename
= os
.path
.join(target_dirname
,
596 "build.%s.%s.log" % (self
.arch
, self
.tries
))
598 # Make sure the target directory exists.
599 if not os
.path
.exists(target_dirname
):
600 os
.makedirs(target_dirname
)
602 # Calculate a SHA512 hash from that file.
603 f
= open(filename
, "rb")
606 buf
= f
.read(BUFFER_SIZE
)
613 # Copy the file to the final location.
614 shutil
.copy2(filename
, target_filename
)
616 # Create an entry in the database.
617 self
.db
.execute("INSERT INTO logfiles(job_id, path, filesize, hash_sha512) \
618 VALUES(%s, %s, %s, %s)", self
.id, os
.path
.relpath(target_filename
, PACKAGES_DIR
),
619 os
.path
.getsize(target_filename
), h
.hexdigest())
621 def _add_file_package(self
, filename
):
622 # Open package (creates entry in the database)
623 pkg
= self
.backend
.packages
.create(filename
)
625 # Move package to the build directory.
626 pkg
.move(os
.path
.join(self
.build
.path
, self
.arch
))
628 # Attach the package to this job.
629 self
.db
.execute("INSERT INTO jobs_packages(job_id, pkg_id) VALUES(%s, %s)",
632 def get_aborted_state(self
):
633 return self
.data
.aborted_state
635 def set_aborted_state(self
, state
):
636 self
._set
_attribute
("aborted_state", state
)
638 aborted_state
= property(get_aborted_state
, set_aborted_state
)
641 def message_recipients(self
):
644 # Add all people watching the build.
645 l
+= self
.build
.message_recipients
647 # Add the package maintainer on release builds.
648 if self
.build
.type == "release":
649 maint
= self
.pkg
.maintainer
651 if isinstance(maint
, users
.User
):
652 l
.append("%s <%s>" % (maint
.realname
, maint
.email
))
656 # XXX add committer and commit author.
658 # Add the owner of the scratch build on scratch builds.
659 elif self
.build
.type == "scratch" and self
.build
.user
:
660 l
.append("%s <%s>" % \
661 (self
.build
.user
.realname
, self
.build
.user
.email
))
665 def save_buildroot(self
, pkgs
):
668 for pkg_name
, pkg_uuid
in pkgs
:
669 rows
.append((self
.id, self
.tries
, pkg_uuid
, pkg_name
))
671 # Cleanup old stuff first (for rebuilding packages).
672 self
.db
.execute("DELETE FROM jobs_buildroots WHERE job_id = %s AND tries = %s",
675 self
.db
.executemany("INSERT INTO \
676 jobs_buildroots(job_id, tries, pkg_uuid, pkg_name) \
677 VALUES(%s, %s, %s, %s)", rows
)
679 def has_buildroot(self
, tries
=None):
683 res
= self
.db
.get("SELECT COUNT(*) AS num FROM jobs_buildroots \
684 WHERE jobs_buildroots.job_id = %s AND jobs_buildroots.tries = %s",
692 def get_buildroot(self
, tries
=None):
696 rows
= self
.db
.query("SELECT * FROM jobs_buildroots \
697 WHERE jobs_buildroots.job_id = %s AND jobs_buildroots.tries = %s \
698 ORDER BY pkg_name", self
.id, tries
)
702 # Search for this package in the packages table.
703 pkg
= self
.backend
.packages
.get_by_uuid(row
.pkg_uuid
)
704 pkgs
.append((row
.pkg_name
, row
.pkg_uuid
, pkg
))
708 def send_finished_message(self
):
709 # Send no finished mails for test jobs.
713 logging
.debug("Sending finished message for job %s to %s" % \
714 (self
.name
, ", ".join(self
.message_recipients
)))
717 "build_name" : self
.name
,
718 "build_host" : self
.builder
.name
,
719 "build_uuid" : self
.uuid
,
722 self
.backend
.messages
.send_to_all(self
.message_recipients
,
723 MSG_BUILD_FINISHED_SUBJECT
, MSG_BUILD_FINISHED
, info
)
725 def send_failed_message(self
):
726 logging
.debug("Sending failed message for job %s to %s" % \
727 (self
.name
, ", ".join(self
.message_recipients
)))
731 build_host
= self
.builder
.name
734 "build_name" : self
.name
,
735 "build_host" : build_host
,
736 "build_uuid" : self
.uuid
,
739 self
.backend
.messages
.send_to_all(self
.message_recipients
,
740 MSG_BUILD_FAILED_SUBJECT
, MSG_BUILD_FAILED
, info
)
742 def set_start_time(self
, start_not_before
):
743 self
._set
_attribute
("start_not_before", start_not_before
)
745 def schedule(self
, type, start_time
=None, user
=None):
746 assert type in ("rebuild", "test")
748 if type == "rebuild":
749 if self
.state
== "finished":
752 self
.set_state("new", user
=user
, log
=False)
753 self
.set_start_time(start_time
)
756 self
.log("schedule_rebuild", user
=user
)
759 if not self
.state
== "finished":
762 # Create a new job with same build and arch.
763 job
= self
.create(self
.backend
, self
.build
, self
.arch
, type="test")
764 job
.set_start_time(start_time
)
767 self
.log("schedule_test_job", test_job
=job
, user
=user
)
771 def schedule_test(self
, start_not_before
=None, user
=None):
773 return self
.schedule("test", start_time
=start_not_before
, user
=user
)
775 def schedule_rebuild(self
, start_not_before
=None, user
=None):
777 return self
.schedule("rebuild", start_time
=start_not_before
, user
=user
)
779 def get_build_repos(self
):
781 Returns a list of all repositories that should be used when
784 repo_ids
= self
.db
.query("SELECT repo_id FROM jobs_repos WHERE job_id = %s",
788 return self
.distro
.get_build_repos()
791 for repo
in self
.distro
.repositories
:
792 if repo
.id in [r
.id for r
in repo_ids
]:
795 return repos
or self
.distro
.get_build_repos()
797 def get_repo_config(self
):
799 Get repository configuration file that is sent to the builder.
803 for repo
in self
.get_build_repos():
804 confs
.append(repo
.get_conf())
806 return "\n\n".join(confs
)
808 def get_config(self
):
810 Get configuration file that is sent to the builder.
814 # Add the distribution configuration.
815 confs
.append(self
.distro
.get_config())
817 # Then add all repositories for this build.
818 confs
.append(self
.get_repo_config())
820 return "\n\n".join(confs
)
823 config
= pakfire
.config
.Config(files
=["general.conf"])
824 config
.parse(self
.get_config())
826 # The filename of the source file.
827 filename
= os
.path
.join(PACKAGES_DIR
, self
.build
.pkg
.path
)
828 assert os
.path
.exists(filename
), filename
830 # Create a new pakfire instance with the configuration for
832 p
= pakfire
.PakfireServer(config
=config
, arch
=self
.arch
)
834 # Try to solve the build dependencies.
836 solver
= p
.resolvdep(filename
)
838 # Catch dependency errors and log the problem string.
839 except DependencyError
, e
:
840 self
.state
= "dependency_error"
841 self
.update_message(e
)
844 # If the build dependencies can be resolved, we set the build in
846 if solver
.status
is True:
847 if self
.state
in ("failed",):
850 self
.state
= "pending"