]> git.ipfire.org Git - pbs.git/blob - src/buildservice/jobs.py
jobs: Builders with too many jobs are not candidates for building
[pbs.git] / src / buildservice / jobs.py
1 #!/usr/bin/python
2
3 import datetime
4 import hashlib
5 import logging
6 import os
7 import shutil
8 import uuid
9
10 import pakfire
11 import pakfire.config
12
13 log = logging.getLogger("builds")
14 log.propagate = 1
15
16 from . import arches
17 from . import base
18 from . import logs
19 from . import users
20
21 from .constants import *
22 from .decorators import *
23
24 class Jobs(base.Object):
25 def _get_job(self, query, *args):
26 res = self.db.get(query, *args)
27
28 if res:
29 return Job(self.backend, res.id, data=res)
30
31 def _get_jobs(self, query, *args):
32 res = self.db.query(query, *args)
33
34 for row in res:
35 yield Job(self.backend, row.id, data=row)
36
37 def create(self, build, arch, test=False, superseeds=None):
38 job = self._get_job("INSERT INTO jobs(uuid, build_id, arch, test) \
39 VALUES(%s, %s, %s, %s) RETURNING *", "%s" % uuid.uuid4(), build.id, arch, test)
40 job.log("created")
41
42 # Set cache for Build object.
43 job.build = build
44
45 # Mark if the new job superseeds some other job
46 if superseeds:
47 superseeds.superseeded_by = job
48
49 return job
50
51 def get_by_id(self, id):
52 return self._get_job("SELECT * FROM jobs WHERE id = %s", id)
53
54 def get_by_uuid(self, uuid):
55 return self._get_job("SELECT * FROM jobs WHERE uuid = %s", uuid)
56
57 def get_active(self, limit=None):
58 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
59 WHERE time_started IS NOT NULL AND time_finished IS NULL \
60 ORDER BY time_started LIMIT %s", limit)
61
62 return jobs
63
64 def get_recently_ended(self, limit=None):
65 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
66 WHERE time_finished IS NOT NULL ORDER BY time_finished DESC LIMIT %s", limit)
67
68 return jobs
69
70 def restart_failed(self):
71 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
72 JOIN builds ON builds.id = jobs.build_id \
73 WHERE \
74 jobs.type = 'build' AND \
75 jobs.state = 'failed' AND \
76 NOT builds.state = 'broken' AND \
77 jobs.time_finished < NOW() - '72 hours'::interval \
78 ORDER BY \
79 CASE \
80 WHEN jobs.type = 'build' THEN 0 \
81 WHEN jobs.type = 'test' THEN 1 \
82 END, \
83 builds.priority DESC, jobs.time_created ASC")
84
85 # Restart the job
86 for job in jobs:
87 job.restart()
88
89
90 class Job(base.DataObject):
91 table = "jobs"
92
93 def __str__(self):
94 return "<%s id=%s %s>" % (self.__class__.__name__, self.id, self.name)
95
96 def __eq__(self, other):
97 if isinstance(other, self.__class__):
98 return self.id == other.id
99
100 def __lt__(self, other):
101 if isinstance(other, self.__class__):
102 if not self.test and other.test:
103 return True
104
105 if self.build == other.build:
106 return arches.priority(self.arch) < arches.priority(other.arch)
107
108 return self.time_created < other.time_created
109
110 def __iter__(self):
111 packages = self.backend.packages._get_packages("SELECT packages.* FROM jobs_packages \
112 LEFT JOIN packages ON jobs_packages.pkg_id = packages.id \
113 WHERE jobs_packages.job_id = %s ORDER BY packages.name", self.id)
114
115 return iter(packages)
116
117 def __nonzero__(self):
118 return True
119
120 def __len__(self):
121 res = self.db.get("SELECT COUNT(*) AS len FROM jobs_packages \
122 WHERE job_id = %s", self.id)
123
124 return res.len
125
126 @property
127 def uuid(self):
128 return self.data.uuid
129
130 @property
131 def name(self):
132 return "%s-%s.%s" % (self.pkg.name, self.pkg.friendly_version, self.arch)
133
134 @property
135 def build_id(self):
136 return self.data.build_id
137
138 @lazy_property
139 def build(self):
140 return self.backend.builds.get_by_id(self.build_id)
141
142 @property
143 def test(self):
144 return self.data.test
145
146 @property
147 def related_jobs(self):
148 ret = []
149
150 for job in self.build.jobs:
151 if job == self:
152 continue
153
154 ret.append(job)
155
156 return ret
157
158 @property
159 def pkg(self):
160 return self.build.pkg
161
162 @property
163 def size(self):
164 return sum((p.size for p in self.packages))
165
166 @lazy_property
167 def rank(self):
168 """
169 Returns the rank in the build queue
170 """
171 if not self.state == "pending":
172 return
173
174 res = self.db.get("SELECT rank FROM jobs_queue WHERE job_id = %s", self.id)
175
176 if res:
177 return res.rank
178
179 @property
180 def distro(self):
181 return self.build.distro
182
183 def get_superseeded_by(self):
184 if self.data.superseeded_by:
185 return self.backend.jobs.get_by_id(self.data.superseeded_by)
186
187 def set_superseeded_by(self, superseeded_by):
188 assert isinstance(superseeded_by, self.__class__)
189
190 self._set_attribute("superseeded_by", superseeded_by.id)
191
192 superseeded_by = lazy_property(get_superseeded_by, set_superseeded_by)
193
194 def start(self, builder):
195 """
196 Starts this job on builder
197 """
198 self.builder = builder
199
200 # Start to dispatch the build job
201 self.state = "dispatching"
202
203 def running(self):
204 self.state = "running"
205
206 # Set start time
207 self.time_started = datetime.datetime.utcnow()
208 self.time_finished = None
209
210 def finished(self):
211 self.state = "finished"
212
213 # Log end time
214 self.time_finished = datetime.datetime.utcnow()
215
216 # Notify users
217 self.send_finished_message()
218
219 def failed(self, message):
220 self.state = "failed"
221 self.message = message
222
223 # Log end time
224 self.time_finished = datetime.datetime.utcnow()
225
226 # Notify users
227 self.send_failed_message()
228
229 def restart(self, test=None, start_not_before=None):
230 # Copy the job and let it build again
231 job = self.backend.jobs.create(self.build, self.arch,
232 test=test or self.test, superseeds=self)
233
234 if start_not_before:
235 job.start_not_before = start_not_before
236
237 return job
238
239 def delete(self):
240 """
241 Deletes a job from the database
242 """
243 # Remove the buildroot
244 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
245
246 # Remove the history
247 self.db.execute("DELETE FROM jobs_history WHERE job_id = %s", self.id)
248
249 # Delete all packages
250 for pkg in self:
251 self.db.execute("DELETE FROM jobs_packages \
252 WHERE job_id = %s AND pkg_id = %s", self.id, pkg.id)
253 pkg.delete()
254
255 # Remove all logfiles
256 for logfile in self.logfiles:
257 self.backend.delete_file(os.path.join(PACKAGES_DIR, logfile.path))
258
259 self.db.execute("DELETE FROM logfiles WHERE job_id = %s", self.id)
260
261 # Delete the job itself.
262 self.db.execute("DELETE FROM jobs WHERE id = %s", self.id)
263
264 ## Logging stuff
265
266 def log(self, action, user=None, state=None, builder=None, test_job=None):
267 user_id = None
268 if user:
269 user_id = user.id
270
271 builder_id = None
272 if builder:
273 builder_id = builder.id
274
275 test_job_id = None
276 if test_job:
277 test_job_id = test_job.id
278
279 self.db.execute("INSERT INTO jobs_history(job_id, action, state, user_id, \
280 time, builder_id, test_job_id) VALUES(%s, %s, %s, %s, NOW(), %s, %s)",
281 self.id, action, state, user_id, builder_id, test_job_id)
282
283 def get_log(self, limit=None, offset=None, user=None):
284 query = "SELECT * FROM jobs_history"
285
286 conditions = ["job_id = %s",]
287 args = [self.id,]
288
289 if user:
290 conditions.append("user_id = %s")
291 args.append(user.id)
292
293 if conditions:
294 query += " WHERE %s" % " AND ".join(conditions)
295
296 query += " ORDER BY time DESC"
297
298 if limit:
299 if offset:
300 query += " LIMIT %s,%s"
301 args += [offset, limit,]
302 else:
303 query += " LIMIT %s"
304 args += [limit,]
305
306 entries = []
307 for entry in self.db.query(query, *args):
308 entry = logs.JobLogEntry(self.backend, entry)
309 entries.append(entry)
310
311 return entries
312
313 def is_running(self):
314 """
315 Returns True if job is in a running state.
316 """
317 return self.state in ("pending", "dispatching", "running", "uploading")
318
319 def get_state(self):
320 return self.data.state
321
322 def set_state(self, state):
323 self._set_attribute("state", state)
324
325 # Automatically update the state of the build (not on test builds)
326 if not self.test:
327 self.build.auto_update_state()
328
329 state = property(get_state, set_state)
330
331 def set_message(self, message):
332 if message:
333 message = "%s" % message
334
335 self._set_attribute("message", message)
336
337 message = property(lambda s: s.data.message, set_message)
338
339 def get_builder(self):
340 if self.data.builder_id:
341 return self.backend.builders.get_by_id(self.data.builder_id)
342
343 def set_builder(self, builder, user=None):
344 self._set_attribute("builder_id", builder.id)
345
346 # Log the event.
347 if user:
348 self.log("builder_assigned", builder=builder, user=user)
349
350 builder = lazy_property(get_builder, set_builder)
351
352 @property
353 def candidate_builders(self):
354 """
355 Returns all active builders that could build this job
356 """
357 builders = self.backend.builders.get_for_arch(self.arch)
358
359 # Remove all builders that are not available
360 builders = (b for b in builders if b.enabled and b.is_online())
361
362 # Remove all builders that have too many jobs
363 builders = (b for b in builders if not b.too_many_jobs)
364
365 # Sort them by the fastest builder first
366 return sorted(builders, key=lambda b: -b.performance_index)
367
368 @property
369 def designated_builder(self):
370 """
371 Returns the fastest candidate builder builder
372 """
373 if self.candidate_builders:
374 return self.candidate_builders[0]
375
376 @property
377 def arch(self):
378 return self.data.arch
379
380 @property
381 def duration(self):
382 if not self.time_started:
383 return 0
384
385 if self.time_finished:
386 delta = self.time_finished - self.time_started
387 else:
388 delta = datetime.datetime.utcnow() - self.time_started
389
390 return delta.total_seconds()
391
392 @property
393 def time_created(self):
394 return self.data.time_created
395
396 def set_time_started(self, time_started):
397 self._set_attribute("time_started", time_started)
398
399 time_started = property(lambda s: s.data.time_started, set_time_started)
400
401 def set_time_finished(self, time_finished):
402 self._set_attribute("time_finished", time_finished)
403
404 time_finished = property(lambda s: s.data.time_finished, set_time_finished)
405
406 def set_start_not_before(self, start_not_before):
407 self._set_attribute("start_not_before", start_not_before)
408
409 start_not_before = property(lambda s: s.data.start_not_before, set_start_not_before)
410
411 def get_pkg_by_uuid(self, uuid):
412 pkg = self.backend.packages._get_package("SELECT packages.id FROM packages \
413 JOIN jobs_packages ON jobs_packages.pkg_id = packages.id \
414 WHERE jobs_packages.job_id = %s AND packages.uuid = %s",
415 self.id, uuid)
416
417 if pkg:
418 pkg.job = self
419 return pkg
420
421 @lazy_property
422 def logfiles(self):
423 logfiles = []
424
425 for log in self.db.query("SELECT id FROM logfiles WHERE job_id = %s", self.id):
426 log = logs.LogFile(self.backend, log.id)
427 log._job = self
428
429 logfiles.append(log)
430
431 return logfiles
432
433 def add_file(self, filename):
434 """
435 Add the specified file to this job.
436
437 The file is copied to the right directory by this function.
438 """
439 assert os.path.exists(filename)
440
441 if filename.endswith(".log"):
442 self._add_file_log(filename)
443
444 elif filename.endswith(".%s" % PACKAGE_EXTENSION):
445 # It is not allowed to upload packages on test builds.
446 if self.test:
447 return
448
449 self._add_file_package(filename)
450
451 def _add_file_log(self, filename):
452 """
453 Attach a log file to this job.
454 """
455 target_dirname = os.path.join(self.build.path, "logs")
456
457 if self.test:
458 i = 1
459 while True:
460 target_filename = os.path.join(target_dirname,
461 "test.%s.%s.%s.log" % (self.arch, i, self.uuid))
462
463 if os.path.exists(target_filename):
464 i += 1
465 else:
466 break
467 else:
468 target_filename = os.path.join(target_dirname,
469 "build.%s.%s.log" % (self.arch, self.uuid))
470
471 # Make sure the target directory exists.
472 if not os.path.exists(target_dirname):
473 os.makedirs(target_dirname)
474
475 # Calculate a SHA512 hash from that file.
476 f = open(filename, "rb")
477 h = hashlib.sha512()
478 while True:
479 buf = f.read(BUFFER_SIZE)
480 if not buf:
481 break
482
483 h.update(buf)
484 f.close()
485
486 # Copy the file to the final location.
487 shutil.copy2(filename, target_filename)
488
489 # Create an entry in the database.
490 self.db.execute("INSERT INTO logfiles(job_id, path, filesize, hash_sha512) \
491 VALUES(%s, %s, %s, %s)", self.id, os.path.relpath(target_filename, PACKAGES_DIR),
492 os.path.getsize(target_filename), h.hexdigest())
493
494 def _add_file_package(self, filename):
495 # Open package (creates entry in the database)
496 pkg = self.backend.packages.create(filename)
497
498 # Move package to the build directory.
499 pkg.move(os.path.join(self.build.path, self.arch))
500
501 # Attach the package to this job.
502 self.db.execute("INSERT INTO jobs_packages(job_id, pkg_id) VALUES(%s, %s)",
503 self.id, pkg.id)
504
505 def get_aborted_state(self):
506 return self.data.aborted_state
507
508 def set_aborted_state(self, state):
509 self._set_attribute("aborted_state", state)
510
511 aborted_state = property(get_aborted_state, set_aborted_state)
512
513 @property
514 def message_recipients(self):
515 l = []
516
517 # Add all people watching the build.
518 l += self.build.message_recipients
519
520 # Add the package maintainer on release builds.
521 if self.build.type == "release":
522 maint = self.pkg.maintainer
523
524 if isinstance(maint, users.User):
525 l.append("%s <%s>" % (maint.realname, maint.email))
526 elif maint:
527 l.append(maint)
528
529 # XXX add committer and commit author.
530
531 # Add the owner of the scratch build on scratch builds.
532 elif self.build.type == "scratch" and self.build.user:
533 l.append("%s <%s>" % \
534 (self.build.user.realname, self.build.user.email))
535
536 return set(l)
537
538 def save_buildroot(self, pkgs):
539 # Cleanup old stuff first (for rebuilding packages)
540 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
541
542 for pkg_name, pkg_uuid in pkgs:
543 self.db.execute("INSERT INTO jobs_buildroots(job_id, pkg_uuid, pkg_name) \
544 VALUES(%s, %s, %s)", self.id, pkg_name, pkg_uuid)
545
546 @lazy_property
547 def buildroot(self):
548 rows = self.db.query("SELECT * FROM jobs_buildroots \
549 WHERE jobs_buildroots.job_id = %s ORDER BY pkg_name", self.id)
550
551 pkgs = []
552 for row in rows:
553 # Search for this package in the packages table.
554 pkg = self.backend.packages.get_by_uuid(row.pkg_uuid)
555 pkgs.append((row.pkg_name, row.pkg_uuid, pkg))
556
557 return pkgs
558
559 def send_finished_message(self):
560 # Send no finished mails for test jobs.
561 if self.test:
562 return
563
564 logging.debug("Sending finished message for job %s to %s" % \
565 (self.name, ", ".join(self.message_recipients)))
566
567 info = {
568 "build_name" : self.name,
569 "build_host" : self.builder.name,
570 "build_uuid" : self.uuid,
571 }
572
573 self.backend.messages.send_to_all(self.message_recipients,
574 MSG_BUILD_FINISHED_SUBJECT, MSG_BUILD_FINISHED, info)
575
576 def send_failed_message(self):
577 logging.debug("Sending failed message for job %s to %s" % \
578 (self.name, ", ".join(self.message_recipients)))
579
580 build_host = "--"
581 if self.builder:
582 build_host = self.builder.name
583
584 info = {
585 "build_name" : self.name,
586 "build_host" : build_host,
587 "build_uuid" : self.uuid,
588 }
589
590 self.backend.messages.send_to_all(self.message_recipients,
591 MSG_BUILD_FAILED_SUBJECT, MSG_BUILD_FAILED, info)
592
593 def get_build_repos(self):
594 """
595 Returns a list of all repositories that should be used when
596 building this job.
597 """
598 repo_ids = self.db.query("SELECT repo_id FROM jobs_repos WHERE job_id = %s",
599 self.id)
600
601 if not repo_ids:
602 return self.distro.get_build_repos()
603
604 repos = []
605 for repo in self.distro.repositories:
606 if repo.id in [r.id for r in repo_ids]:
607 repos.append(repo)
608
609 return repos or self.distro.get_build_repos()
610
611 def get_config(self, local=False):
612 """
613 Get configuration file that is sent to the builder.
614 """
615 confs = []
616
617 # Add the distribution configuration.
618 confs.append(self.distro.get_config())
619
620 # Then add all repositories for this build.
621 for repo in self.get_build_repos():
622 conf = repo.get_conf(local=local)
623 confs.append(conf)
624
625 return "\n\n".join(confs)
626
627 def set_dependency_check_succeeded(self, value):
628 self._set_attribute("dependency_check_succeeded", value)
629 self._set_attribute("dependency_check_at", datetime.datetime.utcnow())
630
631 # Reset the message
632 if value is True:
633 self.message = None
634
635 dependency_check_succeeded = property(
636 lambda s: s.data.dependency_check_succeeded,
637 set_dependency_check_succeeded)
638
639 def resolvdep(self):
640 log.info("Processing dependencies for %s..." % self)
641
642 config = pakfire.config.Config(files=["general.conf"])
643 config.parse(self.get_config(local=True))
644
645 # The filename of the source file.
646 filename = os.path.join(PACKAGES_DIR, self.build.pkg.path)
647 assert os.path.exists(filename), filename
648
649 # Create a new pakfire instance with the configuration for
650 # this build.
651 p = pakfire.PakfireServer(config=config, arch=self.arch)
652
653 # Try to solve the build dependencies.
654 try:
655 solver = p.resolvdep(filename)
656
657 # Catch dependency errors and log the problem string.
658 except DependencyError, e:
659 self.dependency_check_succeeded = False
660 self.message = e
661
662 # The dependency check has succeeded
663 else:
664 self.dependency_check_succeeded = True