]> git.ipfire.org Git - pbs.git/blame - src/buildservice/jobs.py
jobs: Builders with too many jobs are not candidates for building
[pbs.git] / src / buildservice / jobs.py
CommitLineData
2a1e9ce2
MT
1#!/usr/bin/python
2
3import datetime
4import hashlib
5import logging
6import os
7import shutil
8import uuid
9
10import pakfire
11import pakfire.config
12
13log = logging.getLogger("builds")
14log.propagate = 1
15
16from . import arches
17from . import base
18from . import logs
19from . import users
20
21from .constants import *
22from .decorators import *
23
24class Jobs(base.Object):
25 def _get_job(self, query, *args):
26 res = self.db.get(query, *args)
27
28 if res:
29 return Job(self.backend, res.id, data=res)
30
31 def _get_jobs(self, query, *args):
32 res = self.db.query(query, *args)
33
34 for row in res:
35 yield Job(self.backend, row.id, data=row)
36
089dfc92 37 def create(self, build, arch, test=False, superseeds=None):
96cc81de
MT
38 job = self._get_job("INSERT INTO jobs(uuid, build_id, arch, test) \
39 VALUES(%s, %s, %s, %s) RETURNING *", "%s" % uuid.uuid4(), build.id, arch, test)
2a1e9ce2
MT
40 job.log("created")
41
42 # Set cache for Build object.
43 job.build = build
44
3f516e41
MT
45 # Mark if the new job superseeds some other job
46 if superseeds:
47 superseeds.superseeded_by = job
48
2a1e9ce2
MT
49 return job
50
c402c708
MT
51 def get_by_id(self, id):
52 return self._get_job("SELECT * FROM jobs WHERE id = %s", id)
2a1e9ce2
MT
53
54 def get_by_uuid(self, uuid):
45ffc310 55 return self._get_job("SELECT * FROM jobs WHERE uuid = %s", uuid)
2a1e9ce2 56
de667c37
MT
57 def get_active(self, limit=None):
58 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
59 WHERE time_started IS NOT NULL AND time_finished IS NULL \
60 ORDER BY time_started LIMIT %s", limit)
2a1e9ce2 61
de667c37 62 return jobs
2a1e9ce2 63
1b1f4a37
MT
64 def get_recently_ended(self, limit=None):
65 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
66 WHERE time_finished IS NOT NULL ORDER BY time_finished DESC LIMIT %s", limit)
2a1e9ce2 67
1b1f4a37 68 return jobs
2a1e9ce2 69
6990cac2 70 def restart_failed(self):
2a1e9ce2
MT
71 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
72 JOIN builds ON builds.id = jobs.build_id \
73 WHERE \
74 jobs.type = 'build' AND \
75 jobs.state = 'failed' AND \
2a1e9ce2
MT
76 NOT builds.state = 'broken' AND \
77 jobs.time_finished < NOW() - '72 hours'::interval \
78 ORDER BY \
79 CASE \
80 WHEN jobs.type = 'build' THEN 0 \
81 WHEN jobs.type = 'test' THEN 1 \
82 END, \
6990cac2 83 builds.priority DESC, jobs.time_created ASC")
2a1e9ce2
MT
84
85 # Restart the job
86 for job in jobs:
96cc81de 87 job.restart()
2a1e9ce2
MT
88
89
90class Job(base.DataObject):
91 table = "jobs"
92
93 def __str__(self):
94 return "<%s id=%s %s>" % (self.__class__.__name__, self.id, self.name)
95
96 def __eq__(self, other):
97 if isinstance(other, self.__class__):
98 return self.id == other.id
99
100 def __lt__(self, other):
101 if isinstance(other, self.__class__):
4f90cf84 102 if not self.test and other.test:
2a1e9ce2
MT
103 return True
104
105 if self.build == other.build:
106 return arches.priority(self.arch) < arches.priority(other.arch)
107
108 return self.time_created < other.time_created
109
110 def __iter__(self):
111 packages = self.backend.packages._get_packages("SELECT packages.* FROM jobs_packages \
112 LEFT JOIN packages ON jobs_packages.pkg_id = packages.id \
113 WHERE jobs_packages.job_id = %s ORDER BY packages.name", self.id)
114
115 return iter(packages)
116
117 def __nonzero__(self):
118 return True
119
120 def __len__(self):
121 res = self.db.get("SELECT COUNT(*) AS len FROM jobs_packages \
122 WHERE job_id = %s", self.id)
123
124 return res.len
125
b27d67f0
MT
126 @property
127 def uuid(self):
128 return self.data.uuid
129
130 @property
131 def name(self):
132 return "%s-%s.%s" % (self.pkg.name, self.pkg.friendly_version, self.arch)
133
134 @property
135 def build_id(self):
136 return self.data.build_id
137
138 @lazy_property
139 def build(self):
140 return self.backend.builds.get_by_id(self.build_id)
141
142 @property
143 def test(self):
144 return self.data.test
145
146 @property
147 def related_jobs(self):
148 ret = []
149
150 for job in self.build.jobs:
151 if job == self:
152 continue
153
154 ret.append(job)
155
156 return ret
157
158 @property
159 def pkg(self):
160 return self.build.pkg
161
162 @property
163 def size(self):
164 return sum((p.size for p in self.packages))
165
166 @lazy_property
167 def rank(self):
168 """
169 Returns the rank in the build queue
170 """
171 if not self.state == "pending":
172 return
173
174 res = self.db.get("SELECT rank FROM jobs_queue WHERE job_id = %s", self.id)
175
176 if res:
177 return res.rank
178
2a1e9ce2
MT
179 @property
180 def distro(self):
181 return self.build.distro
182
3f516e41
MT
183 def get_superseeded_by(self):
184 if self.data.superseeded_by:
185 return self.backend.jobs.get_by_id(self.data.superseeded_by)
186
187 def set_superseeded_by(self, superseeded_by):
188 assert isinstance(superseeded_by, self.__class__)
189
190 self._set_attribute("superseeded_by", superseeded_by.id)
3f516e41
MT
191
192 superseeded_by = lazy_property(get_superseeded_by, set_superseeded_by)
193
b27d67f0
MT
194 def start(self, builder):
195 """
196 Starts this job on builder
197 """
198 self.builder = builder
199
200 # Start to dispatch the build job
201 self.state = "dispatching"
202
203 def running(self):
204 self.state = "running"
205
206 # Set start time
207 self.time_started = datetime.datetime.utcnow()
208 self.time_finished = None
209
210 def finished(self):
211 self.state = "finished"
212
213 # Log end time
214 self.time_finished = datetime.datetime.utcnow()
215
216 # Notify users
217 self.send_finished_message()
218
219 def failed(self, message):
220 self.state = "failed"
221 self.message = message
222
223 # Log end time
224 self.time_finished = datetime.datetime.utcnow()
225
226 # Notify users
227 self.send_failed_message()
228
229 def restart(self, test=None, start_not_before=None):
230 # Copy the job and let it build again
231 job = self.backend.jobs.create(self.build, self.arch,
232 test=test or self.test, superseeds=self)
233
234 if start_not_before:
235 job.start_not_before = start_not_before
236
237 return job
238
2a1e9ce2 239 def delete(self):
2a1e9ce2 240 """
a08fbdef 241 Deletes a job from the database
2a1e9ce2 242 """
a08fbdef 243 # Remove the buildroot
2a1e9ce2
MT
244 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
245
a08fbdef 246 # Remove the history
2a1e9ce2
MT
247 self.db.execute("DELETE FROM jobs_history WHERE job_id = %s", self.id)
248
a08fbdef
MT
249 # Delete all packages
250 for pkg in self:
251 self.db.execute("DELETE FROM jobs_packages \
252 WHERE job_id = %s AND pkg_id = %s", self.id, pkg.id)
2a1e9ce2
MT
253 pkg.delete()
254
a08fbdef 255 # Remove all logfiles
2a1e9ce2 256 for logfile in self.logfiles:
b2737501 257 self.backend.delete_file(os.path.join(PACKAGES_DIR, logfile.path))
2a1e9ce2 258
a08fbdef
MT
259 self.db.execute("DELETE FROM logfiles WHERE job_id = %s", self.id)
260
261 # Delete the job itself.
262 self.db.execute("DELETE FROM jobs WHERE id = %s", self.id)
263
2a1e9ce2
MT
264 ## Logging stuff
265
266 def log(self, action, user=None, state=None, builder=None, test_job=None):
267 user_id = None
268 if user:
269 user_id = user.id
270
271 builder_id = None
272 if builder:
273 builder_id = builder.id
274
275 test_job_id = None
276 if test_job:
277 test_job_id = test_job.id
278
279 self.db.execute("INSERT INTO jobs_history(job_id, action, state, user_id, \
280 time, builder_id, test_job_id) VALUES(%s, %s, %s, %s, NOW(), %s, %s)",
281 self.id, action, state, user_id, builder_id, test_job_id)
282
283 def get_log(self, limit=None, offset=None, user=None):
284 query = "SELECT * FROM jobs_history"
285
286 conditions = ["job_id = %s",]
287 args = [self.id,]
288
289 if user:
290 conditions.append("user_id = %s")
291 args.append(user.id)
292
293 if conditions:
294 query += " WHERE %s" % " AND ".join(conditions)
295
296 query += " ORDER BY time DESC"
297
298 if limit:
299 if offset:
300 query += " LIMIT %s,%s"
301 args += [offset, limit,]
302 else:
303 query += " LIMIT %s"
304 args += [limit,]
305
306 entries = []
307 for entry in self.db.query(query, *args):
308 entry = logs.JobLogEntry(self.backend, entry)
309 entries.append(entry)
310
311 return entries
312
2a1e9ce2
MT
313 def is_running(self):
314 """
315 Returns True if job is in a running state.
316 """
317 return self.state in ("pending", "dispatching", "running", "uploading")
318
319 def get_state(self):
320 return self.data.state
321
b27d67f0
MT
322 def set_state(self, state):
323 self._set_attribute("state", state)
2a1e9ce2 324
b27d67f0 325 # Automatically update the state of the build (not on test builds)
4f90cf84 326 if not self.test:
2a1e9ce2
MT
327 self.build.auto_update_state()
328
329 state = property(get_state, set_state)
330
e0e14322 331 def set_message(self, message):
7ed118da
MT
332 if message:
333 message = "%s" % message
e0e14322 334
7ed118da 335 self._set_attribute("message", message)
2a1e9ce2 336
7ed118da 337 message = property(lambda s: s.data.message, set_message)
2a1e9ce2
MT
338
339 def get_builder(self):
340 if self.data.builder_id:
341 return self.backend.builders.get_by_id(self.data.builder_id)
342
343 def set_builder(self, builder, user=None):
c2fb4460 344 self._set_attribute("builder_id", builder.id)
2a1e9ce2
MT
345
346 # Log the event.
347 if user:
348 self.log("builder_assigned", builder=builder, user=user)
349
350 builder = lazy_property(get_builder, set_builder)
351
ffd2413d 352 @property
6d7cbfcd
MT
353 def candidate_builders(self):
354 """
355 Returns all active builders that could build this job
356 """
357 builders = self.backend.builders.get_for_arch(self.arch)
358
359 # Remove all builders that are not available
360 builders = (b for b in builders if b.enabled and b.is_online())
361
05aa7b2e
MT
362 # Remove all builders that have too many jobs
363 builders = (b for b in builders if not b.too_many_jobs)
364
6d7cbfcd
MT
365 # Sort them by the fastest builder first
366 return sorted(builders, key=lambda b: -b.performance_index)
367
ffd2413d
MT
368 @property
369 def designated_builder(self):
370 """
371 Returns the fastest candidate builder builder
372 """
373 if self.candidate_builders:
374 return self.candidate_builders[0]
375
2a1e9ce2
MT
376 @property
377 def arch(self):
378 return self.data.arch
379
380 @property
381 def duration(self):
382 if not self.time_started:
383 return 0
384
385 if self.time_finished:
386 delta = self.time_finished - self.time_started
387 else:
388 delta = datetime.datetime.utcnow() - self.time_started
389
390 return delta.total_seconds()
391
392 @property
393 def time_created(self):
394 return self.data.time_created
395
b27d67f0
MT
396 def set_time_started(self, time_started):
397 self._set_attribute("time_started", time_started)
2a1e9ce2 398
b27d67f0
MT
399 time_started = property(lambda s: s.data.time_started, set_time_started)
400
401 def set_time_finished(self, time_finished):
402 self._set_attribute("time_finished", time_finished)
403
404 time_finished = property(lambda s: s.data.time_finished, set_time_finished)
405
406 def set_start_not_before(self, start_not_before):
407 self._set_attribute("start_not_before", start_not_before)
408
409 start_not_before = property(lambda s: s.data.start_not_before, set_start_not_before)
2a1e9ce2 410
2a1e9ce2
MT
411 def get_pkg_by_uuid(self, uuid):
412 pkg = self.backend.packages._get_package("SELECT packages.id FROM packages \
413 JOIN jobs_packages ON jobs_packages.pkg_id = packages.id \
414 WHERE jobs_packages.job_id = %s AND packages.uuid = %s",
415 self.id, uuid)
416
417 if pkg:
418 pkg.job = self
419 return pkg
420
421 @lazy_property
422 def logfiles(self):
423 logfiles = []
424
425 for log in self.db.query("SELECT id FROM logfiles WHERE job_id = %s", self.id):
426 log = logs.LogFile(self.backend, log.id)
427 log._job = self
428
429 logfiles.append(log)
430
431 return logfiles
432
433 def add_file(self, filename):
434 """
435 Add the specified file to this job.
436
437 The file is copied to the right directory by this function.
438 """
439 assert os.path.exists(filename)
440
441 if filename.endswith(".log"):
442 self._add_file_log(filename)
443
444 elif filename.endswith(".%s" % PACKAGE_EXTENSION):
445 # It is not allowed to upload packages on test builds.
4f90cf84 446 if self.test:
2a1e9ce2
MT
447 return
448
449 self._add_file_package(filename)
450
451 def _add_file_log(self, filename):
452 """
453 Attach a log file to this job.
454 """
455 target_dirname = os.path.join(self.build.path, "logs")
456
4f90cf84 457 if self.test:
2a1e9ce2
MT
458 i = 1
459 while True:
460 target_filename = os.path.join(target_dirname,
6990cac2 461 "test.%s.%s.%s.log" % (self.arch, i, self.uuid))
2a1e9ce2
MT
462
463 if os.path.exists(target_filename):
464 i += 1
465 else:
466 break
467 else:
468 target_filename = os.path.join(target_dirname,
6990cac2 469 "build.%s.%s.log" % (self.arch, self.uuid))
2a1e9ce2
MT
470
471 # Make sure the target directory exists.
472 if not os.path.exists(target_dirname):
473 os.makedirs(target_dirname)
474
475 # Calculate a SHA512 hash from that file.
476 f = open(filename, "rb")
477 h = hashlib.sha512()
478 while True:
479 buf = f.read(BUFFER_SIZE)
480 if not buf:
481 break
482
483 h.update(buf)
484 f.close()
485
486 # Copy the file to the final location.
487 shutil.copy2(filename, target_filename)
488
489 # Create an entry in the database.
490 self.db.execute("INSERT INTO logfiles(job_id, path, filesize, hash_sha512) \
491 VALUES(%s, %s, %s, %s)", self.id, os.path.relpath(target_filename, PACKAGES_DIR),
492 os.path.getsize(target_filename), h.hexdigest())
493
494 def _add_file_package(self, filename):
495 # Open package (creates entry in the database)
496 pkg = self.backend.packages.create(filename)
497
498 # Move package to the build directory.
499 pkg.move(os.path.join(self.build.path, self.arch))
500
501 # Attach the package to this job.
502 self.db.execute("INSERT INTO jobs_packages(job_id, pkg_id) VALUES(%s, %s)",
503 self.id, pkg.id)
504
505 def get_aborted_state(self):
506 return self.data.aborted_state
507
508 def set_aborted_state(self, state):
509 self._set_attribute("aborted_state", state)
510
511 aborted_state = property(get_aborted_state, set_aborted_state)
512
513 @property
514 def message_recipients(self):
515 l = []
516
517 # Add all people watching the build.
518 l += self.build.message_recipients
519
520 # Add the package maintainer on release builds.
521 if self.build.type == "release":
522 maint = self.pkg.maintainer
523
524 if isinstance(maint, users.User):
525 l.append("%s <%s>" % (maint.realname, maint.email))
526 elif maint:
527 l.append(maint)
528
529 # XXX add committer and commit author.
530
531 # Add the owner of the scratch build on scratch builds.
532 elif self.build.type == "scratch" and self.build.user:
533 l.append("%s <%s>" % \
534 (self.build.user.realname, self.build.user.email))
535
536 return set(l)
537
538 def save_buildroot(self, pkgs):
6990cac2
MT
539 # Cleanup old stuff first (for rebuilding packages)
540 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
2a1e9ce2
MT
541
542 for pkg_name, pkg_uuid in pkgs:
6990cac2
MT
543 self.db.execute("INSERT INTO jobs_buildroots(job_id, pkg_uuid, pkg_name) \
544 VALUES(%s, %s, %s)", self.id, pkg_name, pkg_uuid)
2a1e9ce2 545
6990cac2
MT
546 @lazy_property
547 def buildroot(self):
2a1e9ce2 548 rows = self.db.query("SELECT * FROM jobs_buildroots \
6990cac2 549 WHERE jobs_buildroots.job_id = %s ORDER BY pkg_name", self.id)
2a1e9ce2
MT
550
551 pkgs = []
552 for row in rows:
553 # Search for this package in the packages table.
554 pkg = self.backend.packages.get_by_uuid(row.pkg_uuid)
555 pkgs.append((row.pkg_name, row.pkg_uuid, pkg))
556
557 return pkgs
558
559 def send_finished_message(self):
560 # Send no finished mails for test jobs.
4f90cf84 561 if self.test:
2a1e9ce2
MT
562 return
563
564 logging.debug("Sending finished message for job %s to %s" % \
565 (self.name, ", ".join(self.message_recipients)))
566
567 info = {
568 "build_name" : self.name,
569 "build_host" : self.builder.name,
570 "build_uuid" : self.uuid,
571 }
572
573 self.backend.messages.send_to_all(self.message_recipients,
574 MSG_BUILD_FINISHED_SUBJECT, MSG_BUILD_FINISHED, info)
575
576 def send_failed_message(self):
577 logging.debug("Sending failed message for job %s to %s" % \
578 (self.name, ", ".join(self.message_recipients)))
579
580 build_host = "--"
581 if self.builder:
582 build_host = self.builder.name
583
584 info = {
585 "build_name" : self.name,
586 "build_host" : build_host,
587 "build_uuid" : self.uuid,
588 }
589
590 self.backend.messages.send_to_all(self.message_recipients,
591 MSG_BUILD_FAILED_SUBJECT, MSG_BUILD_FAILED, info)
592
2a1e9ce2
MT
593 def get_build_repos(self):
594 """
595 Returns a list of all repositories that should be used when
596 building this job.
597 """
598 repo_ids = self.db.query("SELECT repo_id FROM jobs_repos WHERE job_id = %s",
599 self.id)
600
601 if not repo_ids:
602 return self.distro.get_build_repos()
603
604 repos = []
605 for repo in self.distro.repositories:
606 if repo.id in [r.id for r in repo_ids]:
607 repos.append(repo)
608
609 return repos or self.distro.get_build_repos()
610
5286089b 611 def get_config(self, local=False):
2a1e9ce2
MT
612 """
613 Get configuration file that is sent to the builder.
614 """
615 confs = []
616
617 # Add the distribution configuration.
618 confs.append(self.distro.get_config())
619
620 # Then add all repositories for this build.
5286089b
MT
621 for repo in self.get_build_repos():
622 conf = repo.get_conf(local=local)
623 confs.append(conf)
2a1e9ce2
MT
624
625 return "\n\n".join(confs)
626
e0e14322
MT
627 def set_dependency_check_succeeded(self, value):
628 self._set_attribute("dependency_check_succeeded", value)
629 self._set_attribute("dependency_check_at", datetime.datetime.utcnow())
630
631 # Reset the message
632 if value is True:
633 self.message = None
634
635 dependency_check_succeeded = property(
636 lambda s: s.data.dependency_check_succeeded,
637 set_dependency_check_succeeded)
638
2a1e9ce2 639 def resolvdep(self):
b27d67f0
MT
640 log.info("Processing dependencies for %s..." % self)
641
2a1e9ce2 642 config = pakfire.config.Config(files=["general.conf"])
e68f48c3 643 config.parse(self.get_config(local=True))
2a1e9ce2
MT
644
645 # The filename of the source file.
646 filename = os.path.join(PACKAGES_DIR, self.build.pkg.path)
647 assert os.path.exists(filename), filename
648
649 # Create a new pakfire instance with the configuration for
650 # this build.
651 p = pakfire.PakfireServer(config=config, arch=self.arch)
652
653 # Try to solve the build dependencies.
654 try:
655 solver = p.resolvdep(filename)
656
657 # Catch dependency errors and log the problem string.
658 except DependencyError, e:
e0e14322 659 self.dependency_check_succeeded = False
7ed118da 660 self.message = e
2a1e9ce2 661
e0e14322 662 # The dependency check has succeeded
2a1e9ce2 663 else:
e0e14322 664 self.dependency_check_succeeded = True