]>
git.ipfire.org Git - pbs.git/blob - src/buildservice/jobs.py
5d5d984c7de961e26f241ab815bd3b1cfdcd9535
17 from .constants
import *
18 from .decorators
import *
21 log
= logging
.getLogger("pakfire.buildservice.jobs")
23 class Jobs(base
.Object
):
24 def _get_job(self
, query
, *args
):
25 res
= self
.db
.get(query
, *args
)
28 return Job(self
.backend
, res
.id, data
=res
)
30 def _get_jobs(self
, query
, *args
):
31 res
= self
.db
.query(query
, *args
)
34 yield Job(self
.backend
, row
.id, data
=row
)
36 def create(self
, build
, arch
, test
=False, superseeds
=None):
37 job
= self
._get
_job
("""
57 # Set cache for Build object
60 # Mark if the new job superseeds some other job
62 superseeds
.superseeded_by
= job
66 def get_by_id(self
, id):
67 return self
._get
_job
("SELECT * FROM jobs WHERE id = %s", id)
69 def get_by_uuid(self
, uuid
):
70 return self
._get
_job
("SELECT * FROM jobs WHERE uuid = %s", uuid
)
74 jobs
= self
._get
_jobs
("""
80 started_at IS NOT NULL
89 async def depcheck(self
, jobs
=None):
91 Performs a dependency check on all given jobs concurrently
93 await asyncio
.gather(*(job
.depcheck() for job
in jobs
))
96 class Job(base
.DataObject
):
100 return "<%s id=%s %s>" % (self
.__class
__.__name
__, self
.id, self
.name
)
105 def __lt__(self
, other
):
106 if isinstance(other
, self
.__class
__):
107 if not self
.test
and other
.test
:
110 if self
.build
== other
.build
:
111 return arches
.priority(self
.arch
) < arches
.priority(other
.arch
)
113 return self
.time_created
< other
.time_created
115 return NotImplemented
119 return self
.data
.uuid
123 return "%s-%s.%s" % (self
.pkg
.name
, self
.pkg
.evr
, self
.arch
)
127 return self
.backend
.builds
.get_by_id(self
.data
.build_id
)
131 return self
.data
.test
134 def related_jobs(self
):
136 Returns all sibling jobs
138 return [job
for job
in self
.build
.jobs
if not self
== job
]
142 return self
.build
.pkg
146 packages
= self
.backend
.packages
._get
_packages
("""
152 packages ON jobs_packages.pkg_id = packages.id
154 jobs_packages.job_id = %s
160 return list(packages
)
164 return sum((p
.size
for p
in self
.packages
))
167 def estimated_build_time(self
):
169 Returns the time we expect this job to run for
171 res
= self
.db
.get("""
175 package_estimated_build_times
180 self
.pkg
.name
, self
.arch
,
184 return res
.build_time
188 return self
.build
.distro
190 def get_superseeded_by(self
):
191 if self
.data
.superseeded_by
:
192 return self
.backend
.jobs
.get_by_id(self
.data
.superseeded_by
)
194 def set_superseeded_by(self
, superseeded_by
):
195 assert isinstance(superseeded_by
, self
.__class
__)
197 self
._set
_attribute
("superseeded_by", superseeded_by
.id)
199 superseeded_by
= lazy_property(get_superseeded_by
, set_superseeded_by
)
202 def created_at(self
):
204 Returns when this job was created
206 return self
.data
.created_at
209 def started_at(self
):
211 Returns when this job was started
213 return self
.data
.started_at
216 def finished_at(self
):
218 Returns when this job finished
220 return self
.data
.finished_at
222 def start(self
, builder
):
224 Starts this job on builder
226 log
.info("Starting job %s on %s" % (self
, builder
))
228 # Store the assigned builder
229 self
._set
_attribute
("builder_id", builder
)
232 self
._set
_attribute
_now
("started_at")
234 async def finished(self
, success
, message
=None, log
=None):
236 Called when this job has finished
239 self
._set
_attribute
_now
("finished_at")
243 await self
._import
_log
(log
)
245 # Did this build fail?
251 self
._set
_attribute
("message", message
)
255 self
.send_finished_message()
257 self
._set
_attribute
("failed", True)
258 self
.send_failed_message()
260 # XXX propagate any changes to the build
262 def is_running(self
):
264 Returns True if this job is running
266 return self
.started_at
and not self
.finished_at
268 def has_finished(self
):
277 Indicates whether this job has failed
279 return self
.data
.failed
283 return self
.data
.message
287 Deletes a job from the database
289 # Remove the buildroot
290 self
.db
.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self
.id)
293 self
.db
.execute("DELETE FROM jobs_history WHERE job_id = %s", self
.id)
295 # Delete all packages
297 self
.db
.execute("DELETE FROM jobs_packages \
298 WHERE job_id = %s AND pkg_id = %s", self
.id, pkg
.id)
301 # Remove all logfiles
302 for logfile
in self
.logfiles
:
303 path
= self
.backend
.path("packages", logfile
.path
)
304 self
.backend
.delete_file(path
)
306 self
.db
.execute("DELETE FROM logfiles WHERE job_id = %s", self
.id)
308 # Delete the job itself.
309 self
.db
.execute("DELETE FROM jobs WHERE id = %s", self
.id)
313 Clones this build job
315 job
= self
.backend
.jobs
.create(
322 log
.debug("Cloned job %s as %s" % (self
, job
))
336 return self
.backend
.path_to_url(self
.log_path
)
340 return self
.data
.log_path
344 return self
.data
.log_size
347 def log_digest_blake2s(self
):
348 return self
.data
.log_digest_blake2s
350 async def _import_log(self
, upload
):
351 # Create some destination path
352 path
= self
.backend
.path(
357 "%s.log" % self
.uuid
[4:],
360 # Copy file to its destination
361 await self
.backend
.copy(upload
.path
, path
)
363 # Compute a digest for integrity
364 digest
= await upload
.digest("blake2s")
366 # Store everything in the database
367 self
._set
_attribute
("log_path", path
)
368 self
._set
_attribute
("log_size", upload
.size
)
369 self
._set
_attribute
("log_digest_blake2s", digest
)
375 if self
.data
.builder_id
:
376 return self
.backend
.builders
.get_by_id(self
.data
.builder_id
)
380 return self
.data
.arch
385 Returns the total build duration or elapsed time
387 if self
.has_finished():
388 return self
.finished_at
- self
.started_at
390 return datetime
.datetime
.utcnow() - self
.started_at
392 def add_file(self
, filename
):
394 Add the specified file to this job.
396 The file is copied to the right directory by this function.
398 assert os
.path
.exists(filename
)
400 if filename
.endswith(".%s" % PACKAGE_EXTENSION
):
401 # It is not allowed to upload packages on test builds.
405 # Open package (creates entry in the database)
406 pkg
= self
.backend
.packages
.create(filename
)
408 # Move package to the build directory.
409 pkg
.move(os
.path
.join(self
.build
.path
, self
.arch
))
411 # Attach the package to this job.
412 self
.db
.execute("INSERT INTO jobs_packages(job_id, pkg_id) VALUES(%s, %s)",
416 def message_recipients(self
):
419 # Add all people watching the build.
420 l
+= self
.build
.message_recipients
422 # Add the package maintainer on release builds.
423 if self
.build
.type == "release":
424 maint
= self
.pkg
.maintainer
426 if isinstance(maint
, users
.User
):
427 l
.append("%s <%s>" % (maint
.realname
, maint
.email
))
431 # XXX add committer and commit author.
433 # Add the owner of the scratch build on scratch builds.
434 elif self
.build
.type == "scratch" and self
.build
.user
:
435 l
.append("%s <%s>" % \
436 (self
.build
.user
.realname
, self
.build
.user
.email
))
440 def save_buildroot(self
, pkgs
):
441 # Cleanup old stuff first (for rebuilding packages)
442 self
.db
.execute("DELETE FROM jobs_buildroots WHERE job_id = %s", self
.id)
444 for pkg_name
, pkg_uuid
in pkgs
:
445 self
.db
.execute("INSERT INTO jobs_buildroots(job_id, pkg_uuid, pkg_name) \
446 VALUES(%s, %s, %s)", self
.id, pkg_name
, pkg_uuid
)
450 rows
= self
.db
.query("SELECT * FROM jobs_buildroots \
451 WHERE jobs_buildroots.job_id = %s ORDER BY pkg_name", self
.id)
455 # Search for this package in the packages table.
456 pkg
= self
.backend
.packages
.get_by_uuid(row
.pkg_uuid
)
457 pkgs
.append((row
.pkg_name
, row
.pkg_uuid
, pkg
))
461 def send_finished_message(self
):
462 # Send no finished mails for test jobs.
466 logging
.debug("Sending finished message for job %s to %s" % \
467 (self
.name
, ", ".join(self
.message_recipients
)))
469 self
.backend
.messages
.send_template_to_many(self
.message_recipients
,
470 "messages/jobs/finished", job
=self
)
472 def send_failed_message(self
):
473 logging
.debug("Sending failed message for job %s to %s" % \
474 (self
.name
, ", ".join(self
.message_recipients
)))
476 self
.backend
.messages
.send_template_to_many(self
.message_recipients
,
477 "messages/jobs/failed", job
=self
)
482 Generate the Pakfire configuration for this job
484 return self
.backend
.pakfire(distro
=self
.distro
, repos
=[self
.build
.build_repo
])
486 async def depcheck(self
):
488 Perform dependency check
490 log
.info("Performing dependency check for %s" % self
)
492 with self
.db
.transaction():
493 return await asyncio
.to_thread(self
._depcheck
)
496 # Create a Pakfire instance
497 with self
.pakfire() as p
:
498 # Try to install the source package
500 p
.install([self
.pkg
.path
], dryrun
=True)
502 # XXX Pakfire should throw a better exception
503 except Exception as e
:
504 self
._set
_attribute
("depcheck_succeeded", False)
507 self
._set
_attribute
("message", "%s" % e
)
511 self
._set
_attribute
("depcheck_succeeded", True)
513 # Store the timestamp
514 self
._set
_attribute
_now
("depcheck_performed_at")
517 def depcheck_succeeded(self
):
518 return self
.data
.depcheck_succeeded
521 def depcheck_performed_at(self
):
522 return self
.data
.depcheck_performed_at