]> git.ipfire.org Git - pbs.git/blame - src/buildservice/jobs.py
builders: Show correct job queue
[pbs.git] / src / buildservice / jobs.py
CommitLineData
2a1e9ce2
MT
1#!/usr/bin/python
2
3import datetime
4import hashlib
5import logging
6import os
7import shutil
8import uuid
9
10import pakfire
11import pakfire.config
12
13log = logging.getLogger("builds")
14log.propagate = 1
15
16from . import arches
17from . import base
18from . import logs
19from . import users
20
21from .constants import *
22from .decorators import *
23
24class Jobs(base.Object):
25 def _get_job(self, query, *args):
26 res = self.db.get(query, *args)
27
28 if res:
29 return Job(self.backend, res.id, data=res)
30
31 def _get_jobs(self, query, *args):
32 res = self.db.query(query, *args)
33
34 for row in res:
35 yield Job(self.backend, row.id, data=row)
36
089dfc92 37 def create(self, build, arch, test=False, superseeds=None):
96cc81de
MT
38 job = self._get_job("INSERT INTO jobs(uuid, build_id, arch, test) \
39 VALUES(%s, %s, %s, %s) RETURNING *", "%s" % uuid.uuid4(), build.id, arch, test)
2a1e9ce2
MT
40 job.log("created")
41
42 # Set cache for Build object.
43 job.build = build
44
3f516e41
MT
45 # Mark if the new job superseeds some other job
46 if superseeds:
47 superseeds.superseeded_by = job
48
2a1e9ce2
MT
49 return job
50
c402c708
MT
51 def get_by_id(self, id):
52 return self._get_job("SELECT * FROM jobs WHERE id = %s", id)
2a1e9ce2
MT
53
54 def get_by_uuid(self, uuid):
45ffc310 55 return self._get_job("SELECT * FROM jobs WHERE uuid = %s", uuid)
2a1e9ce2 56
de667c37
MT
57 def get_active(self, limit=None):
58 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
59 WHERE time_started IS NOT NULL AND time_finished IS NULL \
60 ORDER BY time_started LIMIT %s", limit)
2a1e9ce2 61
de667c37 62 return jobs
2a1e9ce2 63
1b1f4a37
MT
64 def get_recently_ended(self, limit=None):
65 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
66 WHERE time_finished IS NOT NULL ORDER BY time_finished DESC LIMIT %s", limit)
2a1e9ce2 67
1b1f4a37 68 return jobs
2a1e9ce2 69
6990cac2 70 def restart_failed(self):
2a1e9ce2
MT
71 jobs = self._get_jobs("SELECT jobs.* FROM jobs \
72 JOIN builds ON builds.id = jobs.build_id \
73 WHERE \
74 jobs.type = 'build' AND \
75 jobs.state = 'failed' AND \
2a1e9ce2
MT
76 NOT builds.state = 'broken' AND \
77 jobs.time_finished < NOW() - '72 hours'::interval \
78 ORDER BY \
79 CASE \
80 WHEN jobs.type = 'build' THEN 0 \
81 WHEN jobs.type = 'test' THEN 1 \
82 END, \
6990cac2 83 builds.priority DESC, jobs.time_created ASC")
2a1e9ce2
MT
84
85 # Restart the job
86 for job in jobs:
96cc81de 87 job.restart()
2a1e9ce2
MT
88
89
90class Job(base.DataObject):
91 table = "jobs"
92
93 def __str__(self):
94 return "<%s id=%s %s>" % (self.__class__.__name__, self.id, self.name)
95
96 def __eq__(self, other):
97 if isinstance(other, self.__class__):
98 return self.id == other.id
99
100 def __lt__(self, other):
101 if isinstance(other, self.__class__):
4f90cf84 102 if not self.test and other.test:
2a1e9ce2
MT
103 return True
104
105 if self.build == other.build:
106 return arches.priority(self.arch) < arches.priority(other.arch)
107
108 return self.time_created < other.time_created
109
110 def __iter__(self):
111 packages = self.backend.packages._get_packages("SELECT packages.* FROM jobs_packages \
112 LEFT JOIN packages ON jobs_packages.pkg_id = packages.id \
113 WHERE jobs_packages.job_id = %s ORDER BY packages.name", self.id)
114
115 return iter(packages)
116
117 def __nonzero__(self):
118 return True
119
120 def __len__(self):
121 res = self.db.get("SELECT COUNT(*) AS len FROM jobs_packages \
122 WHERE job_id = %s", self.id)
123
124 return res.len
125
126 @property
127 def distro(self):
128 return self.build.distro
129
96cc81de
MT
130 def restart(self):
131 # Copy the job and let it build again
132 return self.backend.jobs.create(self.build, self.arch,
133 test=self.test, superseeds=self)
134
3f516e41
MT
135 def get_superseeded_by(self):
136 if self.data.superseeded_by:
137 return self.backend.jobs.get_by_id(self.data.superseeded_by)
138
139 def set_superseeded_by(self, superseeded_by):
140 assert isinstance(superseeded_by, self.__class__)
141
142 self._set_attribute("superseeded_by", superseeded_by.id)
143 self.superseeded_by = superseeded_by
144
145 superseeded_by = lazy_property(get_superseeded_by, set_superseeded_by)
146
2a1e9ce2 147 def delete(self):
2a1e9ce2 148 """
a08fbdef 149 Deletes a job from the database
2a1e9ce2 150 """
a08fbdef 151 # Remove the buildroot
2a1e9ce2
MT
152 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
153
a08fbdef 154 # Remove the history
2a1e9ce2
MT
155 self.db.execute("DELETE FROM jobs_history WHERE job_id = %s", self.id)
156
a08fbdef
MT
157 # Delete all packages
158 for pkg in self:
159 self.db.execute("DELETE FROM jobs_packages \
160 WHERE job_id = %s AND pkg_id = %s", self.id, pkg.id)
2a1e9ce2
MT
161 pkg.delete()
162
a08fbdef 163 # Remove all logfiles
2a1e9ce2 164 for logfile in self.logfiles:
b2737501 165 self.backend.delete_file(os.path.join(PACKAGES_DIR, logfile.path))
2a1e9ce2 166
a08fbdef
MT
167 self.db.execute("DELETE FROM logfiles WHERE job_id = %s", self.id)
168
169 # Delete the job itself.
170 self.db.execute("DELETE FROM jobs WHERE id = %s", self.id)
171
2a1e9ce2
MT
172 ## Logging stuff
173
174 def log(self, action, user=None, state=None, builder=None, test_job=None):
175 user_id = None
176 if user:
177 user_id = user.id
178
179 builder_id = None
180 if builder:
181 builder_id = builder.id
182
183 test_job_id = None
184 if test_job:
185 test_job_id = test_job.id
186
187 self.db.execute("INSERT INTO jobs_history(job_id, action, state, user_id, \
188 time, builder_id, test_job_id) VALUES(%s, %s, %s, %s, NOW(), %s, %s)",
189 self.id, action, state, user_id, builder_id, test_job_id)
190
191 def get_log(self, limit=None, offset=None, user=None):
192 query = "SELECT * FROM jobs_history"
193
194 conditions = ["job_id = %s",]
195 args = [self.id,]
196
197 if user:
198 conditions.append("user_id = %s")
199 args.append(user.id)
200
201 if conditions:
202 query += " WHERE %s" % " AND ".join(conditions)
203
204 query += " ORDER BY time DESC"
205
206 if limit:
207 if offset:
208 query += " LIMIT %s,%s"
209 args += [offset, limit,]
210 else:
211 query += " LIMIT %s"
212 args += [limit,]
213
214 entries = []
215 for entry in self.db.query(query, *args):
216 entry = logs.JobLogEntry(self.backend, entry)
217 entries.append(entry)
218
219 return entries
220
221 @property
222 def uuid(self):
223 return self.data.uuid
224
225 @property
4f90cf84
MT
226 def test(self):
227 return self.data.test
2a1e9ce2
MT
228
229 @property
230 def build_id(self):
231 return self.data.build_id
232
233 @lazy_property
234 def build(self):
235 return self.backend.builds.get_by_id(self.build_id)
236
237 @property
238 def related_jobs(self):
239 ret = []
240
241 for job in self.build.jobs:
242 if job == self:
243 continue
244
245 ret.append(job)
246
247 return ret
248
249 @property
250 def pkg(self):
251 return self.build.pkg
252
253 @property
254 def name(self):
255 return "%s-%s.%s" % (self.pkg.name, self.pkg.friendly_version, self.arch)
256
257 @property
258 def size(self):
259 return sum((p.size for p in self.packages))
260
261 @lazy_property
262 def rank(self):
263 """
264 Returns the rank in the build queue
265 """
266 if not self.state == "pending":
267 return
268
269 res = self.db.get("SELECT rank FROM jobs_queue WHERE job_id = %s", self.id)
270
271 if res:
272 return res.rank
273
274 def is_running(self):
275 """
276 Returns True if job is in a running state.
277 """
278 return self.state in ("pending", "dispatching", "running", "uploading")
279
280 def get_state(self):
281 return self.data.state
282
283 def set_state(self, state, user=None, log=True):
284 # Nothing to do if the state remains.
285 if not self.state == state:
c2fb4460 286 self._set_attribute("state", state)
2a1e9ce2
MT
287
288 # Log the event.
289 if log and not state == "new":
290 self.log("state_change", state=state, user=user)
291
2a1e9ce2
MT
292 # Always clear the message when the status is changed.
293 self.update_message(None)
294
295 # Update some more informations.
296 if state == "dispatching":
297 # Set start time.
c2fb4460 298 self._set_attribute("time_started", datetime.datetime.utcnow())
2a1e9ce2
MT
299
300 elif state in ("aborted", "dependency_error", "finished", "failed"):
c2fb4460 301 self._set_attribute("time_finished", datetime.datetime.utcnow())
2a1e9ce2
MT
302
303 # Send messages to the user.
304 if state == "finished":
305 self.send_finished_message()
306
307 elif state == "failed":
2a1e9ce2
MT
308 self.send_failed_message()
309
310 # Automatically update the state of the build (not on test builds).
4f90cf84 311 if not self.test:
2a1e9ce2
MT
312 self.build.auto_update_state()
313
314 state = property(get_state, set_state)
315
316 @property
317 def message(self):
318 return self.data.message
319
c2fb4460
MT
320 def update_message(self, message):
321 self._set_attribute("message", message)
2a1e9ce2
MT
322
323 def get_builder(self):
324 if self.data.builder_id:
325 return self.backend.builders.get_by_id(self.data.builder_id)
326
327 def set_builder(self, builder, user=None):
c2fb4460 328 self._set_attribute("builder_id", builder.id)
2a1e9ce2
MT
329
330 # Log the event.
331 if user:
332 self.log("builder_assigned", builder=builder, user=user)
333
334 builder = lazy_property(get_builder, set_builder)
335
336 @property
337 def arch(self):
338 return self.data.arch
339
340 @property
341 def duration(self):
342 if not self.time_started:
343 return 0
344
345 if self.time_finished:
346 delta = self.time_finished - self.time_started
347 else:
348 delta = datetime.datetime.utcnow() - self.time_started
349
350 return delta.total_seconds()
351
352 @property
353 def time_created(self):
354 return self.data.time_created
355
356 @property
357 def time_started(self):
358 return self.data.time_started
359
360 @property
361 def time_finished(self):
362 return self.data.time_finished
363
2a1e9ce2
MT
364 def get_pkg_by_uuid(self, uuid):
365 pkg = self.backend.packages._get_package("SELECT packages.id FROM packages \
366 JOIN jobs_packages ON jobs_packages.pkg_id = packages.id \
367 WHERE jobs_packages.job_id = %s AND packages.uuid = %s",
368 self.id, uuid)
369
370 if pkg:
371 pkg.job = self
372 return pkg
373
374 @lazy_property
375 def logfiles(self):
376 logfiles = []
377
378 for log in self.db.query("SELECT id FROM logfiles WHERE job_id = %s", self.id):
379 log = logs.LogFile(self.backend, log.id)
380 log._job = self
381
382 logfiles.append(log)
383
384 return logfiles
385
386 def add_file(self, filename):
387 """
388 Add the specified file to this job.
389
390 The file is copied to the right directory by this function.
391 """
392 assert os.path.exists(filename)
393
394 if filename.endswith(".log"):
395 self._add_file_log(filename)
396
397 elif filename.endswith(".%s" % PACKAGE_EXTENSION):
398 # It is not allowed to upload packages on test builds.
4f90cf84 399 if self.test:
2a1e9ce2
MT
400 return
401
402 self._add_file_package(filename)
403
404 def _add_file_log(self, filename):
405 """
406 Attach a log file to this job.
407 """
408 target_dirname = os.path.join(self.build.path, "logs")
409
4f90cf84 410 if self.test:
2a1e9ce2
MT
411 i = 1
412 while True:
413 target_filename = os.path.join(target_dirname,
6990cac2 414 "test.%s.%s.%s.log" % (self.arch, i, self.uuid))
2a1e9ce2
MT
415
416 if os.path.exists(target_filename):
417 i += 1
418 else:
419 break
420 else:
421 target_filename = os.path.join(target_dirname,
6990cac2 422 "build.%s.%s.log" % (self.arch, self.uuid))
2a1e9ce2
MT
423
424 # Make sure the target directory exists.
425 if not os.path.exists(target_dirname):
426 os.makedirs(target_dirname)
427
428 # Calculate a SHA512 hash from that file.
429 f = open(filename, "rb")
430 h = hashlib.sha512()
431 while True:
432 buf = f.read(BUFFER_SIZE)
433 if not buf:
434 break
435
436 h.update(buf)
437 f.close()
438
439 # Copy the file to the final location.
440 shutil.copy2(filename, target_filename)
441
442 # Create an entry in the database.
443 self.db.execute("INSERT INTO logfiles(job_id, path, filesize, hash_sha512) \
444 VALUES(%s, %s, %s, %s)", self.id, os.path.relpath(target_filename, PACKAGES_DIR),
445 os.path.getsize(target_filename), h.hexdigest())
446
447 def _add_file_package(self, filename):
448 # Open package (creates entry in the database)
449 pkg = self.backend.packages.create(filename)
450
451 # Move package to the build directory.
452 pkg.move(os.path.join(self.build.path, self.arch))
453
454 # Attach the package to this job.
455 self.db.execute("INSERT INTO jobs_packages(job_id, pkg_id) VALUES(%s, %s)",
456 self.id, pkg.id)
457
458 def get_aborted_state(self):
459 return self.data.aborted_state
460
461 def set_aborted_state(self, state):
462 self._set_attribute("aborted_state", state)
463
464 aborted_state = property(get_aborted_state, set_aborted_state)
465
466 @property
467 def message_recipients(self):
468 l = []
469
470 # Add all people watching the build.
471 l += self.build.message_recipients
472
473 # Add the package maintainer on release builds.
474 if self.build.type == "release":
475 maint = self.pkg.maintainer
476
477 if isinstance(maint, users.User):
478 l.append("%s <%s>" % (maint.realname, maint.email))
479 elif maint:
480 l.append(maint)
481
482 # XXX add committer and commit author.
483
484 # Add the owner of the scratch build on scratch builds.
485 elif self.build.type == "scratch" and self.build.user:
486 l.append("%s <%s>" % \
487 (self.build.user.realname, self.build.user.email))
488
489 return set(l)
490
491 def save_buildroot(self, pkgs):
6990cac2
MT
492 # Cleanup old stuff first (for rebuilding packages)
493 self.db.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self.id)
2a1e9ce2
MT
494
495 for pkg_name, pkg_uuid in pkgs:
6990cac2
MT
496 self.db.execute("INSERT INTO jobs_buildroots(job_id, pkg_uuid, pkg_name) \
497 VALUES(%s, %s, %s)", self.id, pkg_name, pkg_uuid)
2a1e9ce2 498
6990cac2
MT
499 @lazy_property
500 def buildroot(self):
2a1e9ce2 501 rows = self.db.query("SELECT * FROM jobs_buildroots \
6990cac2 502 WHERE jobs_buildroots.job_id = %s ORDER BY pkg_name", self.id)
2a1e9ce2
MT
503
504 pkgs = []
505 for row in rows:
506 # Search for this package in the packages table.
507 pkg = self.backend.packages.get_by_uuid(row.pkg_uuid)
508 pkgs.append((row.pkg_name, row.pkg_uuid, pkg))
509
510 return pkgs
511
512 def send_finished_message(self):
513 # Send no finished mails for test jobs.
4f90cf84 514 if self.test:
2a1e9ce2
MT
515 return
516
517 logging.debug("Sending finished message for job %s to %s" % \
518 (self.name, ", ".join(self.message_recipients)))
519
520 info = {
521 "build_name" : self.name,
522 "build_host" : self.builder.name,
523 "build_uuid" : self.uuid,
524 }
525
526 self.backend.messages.send_to_all(self.message_recipients,
527 MSG_BUILD_FINISHED_SUBJECT, MSG_BUILD_FINISHED, info)
528
529 def send_failed_message(self):
530 logging.debug("Sending failed message for job %s to %s" % \
531 (self.name, ", ".join(self.message_recipients)))
532
533 build_host = "--"
534 if self.builder:
535 build_host = self.builder.name
536
537 info = {
538 "build_name" : self.name,
539 "build_host" : build_host,
540 "build_uuid" : self.uuid,
541 }
542
543 self.backend.messages.send_to_all(self.message_recipients,
544 MSG_BUILD_FAILED_SUBJECT, MSG_BUILD_FAILED, info)
545
546 def set_start_time(self, start_not_before):
547 self._set_attribute("start_not_before", start_not_before)
548
549 def schedule(self, type, start_time=None, user=None):
550 assert type in ("rebuild", "test")
551
552 if type == "rebuild":
553 if self.state == "finished":
554 return
555
96cc81de
MT
556 job = self.restart()
557 job.set_start_time(start_time)
2a1e9ce2
MT
558
559 # Log the event.
560 self.log("schedule_rebuild", user=user)
561
562 elif type == "test":
563 if not self.state == "finished":
564 return
565
566 # Create a new job with same build and arch.
96cc81de 567 job = self.create(self.backend, self.build, self.arch, test=True)
2a1e9ce2
MT
568 job.set_start_time(start_time)
569
570 # Log the event.
571 self.log("schedule_test_job", test_job=job, user=user)
572
573 return job
574
575 def schedule_test(self, start_not_before=None, user=None):
576 # XXX to be removed
577 return self.schedule("test", start_time=start_not_before, user=user)
578
579 def schedule_rebuild(self, start_not_before=None, user=None):
580 # XXX to be removed
581 return self.schedule("rebuild", start_time=start_not_before, user=user)
582
583 def get_build_repos(self):
584 """
585 Returns a list of all repositories that should be used when
586 building this job.
587 """
588 repo_ids = self.db.query("SELECT repo_id FROM jobs_repos WHERE job_id = %s",
589 self.id)
590
591 if not repo_ids:
592 return self.distro.get_build_repos()
593
594 repos = []
595 for repo in self.distro.repositories:
596 if repo.id in [r.id for r in repo_ids]:
597 repos.append(repo)
598
599 return repos or self.distro.get_build_repos()
600
5286089b 601 def get_config(self, local=False):
2a1e9ce2
MT
602 """
603 Get configuration file that is sent to the builder.
604 """
605 confs = []
606
607 # Add the distribution configuration.
608 confs.append(self.distro.get_config())
609
610 # Then add all repositories for this build.
5286089b
MT
611 for repo in self.get_build_repos():
612 conf = repo.get_conf(local=local)
613 confs.append(conf)
2a1e9ce2
MT
614
615 return "\n\n".join(confs)
616
617 def resolvdep(self):
618 config = pakfire.config.Config(files=["general.conf"])
e68f48c3 619 config.parse(self.get_config(local=True))
2a1e9ce2
MT
620
621 # The filename of the source file.
622 filename = os.path.join(PACKAGES_DIR, self.build.pkg.path)
623 assert os.path.exists(filename), filename
624
625 # Create a new pakfire instance with the configuration for
626 # this build.
627 p = pakfire.PakfireServer(config=config, arch=self.arch)
628
629 # Try to solve the build dependencies.
630 try:
631 solver = p.resolvdep(filename)
632
633 # Catch dependency errors and log the problem string.
634 except DependencyError, e:
635 self.state = "dependency_error"
9b652889 636 self.update_message("%s" % e)
2a1e9ce2
MT
637
638 else:
639 # If the build dependencies can be resolved, we set the build in
640 # pending state.
641 if solver.status is True:
642 if self.state in ("failed",):
643 return
644
645 self.state = "pending"