From: Michael Tremer Date: Wed, 20 Sep 2023 12:16:18 +0000 (+0000) Subject: jobs: Automatcially abort any jobs after three hours X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=4f88a13e7eccde07888d983f96a890dc363a73c7;p=pbs.git jobs: Automatcially abort any jobs after three hours If builds hang or have some other problem, we will automatically abort them after three hours so that we don't keep the builders up idle. Signed-off-by: Michael Tremer --- diff --git a/src/buildservice/__init__.py b/src/buildservice/__init__.py index 862ed46b..fc5a809e 100644 --- a/src/buildservice/__init__.py +++ b/src/buildservice/__init__.py @@ -117,6 +117,9 @@ class Backend(object): # Cleanup regularly self.run_periodic_task(3600, self.cleanup) + # Automatically abort any jobs that run for forever + self.run_periodic_task(60, self.jobs.abort) + def read_config(self, path): c = configparser.ConfigParser() diff --git a/src/buildservice/builds.py b/src/buildservice/builds.py index 95f0ad09..bc2649e1 100644 --- a/src/buildservice/builds.py +++ b/src/buildservice/builds.py @@ -1,6 +1,7 @@ #!/usr/bin/python import asyncio +import datetime import itertools import logging import os @@ -276,7 +277,7 @@ class Builds(base.Object): return list(builds) async def create(self, repo, package, owner=None, group=None, test=False, - disable_test_builds=False): + disable_test_builds=False, timeout=None): """ Creates a new build based on the given distribution and package """ @@ -288,6 +289,10 @@ class Builds(base.Object): if not package.path or not await self.backend.exists(package.path): raise RuntimeError("Package %s does not exist (path = %s)" % (package, package.path)) + # Set a default timeout + if timeout is None: + timeout = datetime.timedelta(hours=3) + build = self._get_build(""" INSERT INTO builds @@ -320,7 +325,7 @@ class Builds(base.Object): group.builds.append(build) # Create all jobs - build._create_jobs() + build._create_jobs(timeout=timeout) if not build.is_test(): # Deprecate previous builds @@ -604,13 +609,13 @@ class Build(base.DataObject): return self._get_jobs("SELECT * FROM jobs \ WHERE build_id = %s", self.id) - def _create_jobs(self): + def _create_jobs(self, **kwargs): """ Called after a build has been created and creates all jobs """ # Create the jobs for arch in self.arches: - self.backend.jobs.create(self, arch) + self.backend.jobs.create(self, arch, **kwargs) async def _job_finished(self, job): """ diff --git a/src/buildservice/jobs.py b/src/buildservice/jobs.py index 40da06e4..95427985 100644 --- a/src/buildservice/jobs.py +++ b/src/buildservice/jobs.py @@ -72,22 +72,23 @@ class Jobs(base.Object): def _get_jobs(self, query, *args, **kwargs): return self.db.fetch_many(Job, query, *args, **kwargs) - def create(self, build, arch, superseeds=None): + def create(self, build, arch, superseeds=None, timeout=None): job = self._get_job(""" INSERT INTO jobs ( build_id, - arch + arch, + timeout ) VALUES ( - %s, - %s + %s, %s, %s ) RETURNING *""", build, arch, + timeout, # Populate cache build=build, @@ -182,6 +183,34 @@ class Jobs(base.Object): # Request dispatch await self.backend.jobs.queue.dispatch() + async def abort(self): + """ + This is periodically called to abort any jobs that have crashed on the + builders for unknown reasons. + """ + log.debug("Aborting timed-out jobs...") + + jobs = self._get_jobs(""" + SELECT + * + FROM + jobs + WHERE + deleted_at IS NULL + AND + finished_at IS NULL + AND + started_at IS NOT NULL + AND + timeout IS NOT NULL + AND + started_at + timeout < CURRENT_TIMESTAMP + """) + + # Abort them all... + for job in jobs: + await job.abort() + class Queue(base.Object): # Locked when the queue is being processed diff --git a/src/database.sql b/src/database.sql index 73a38542..690526b2 100644 --- a/src/database.sql +++ b/src/database.sql @@ -404,7 +404,8 @@ CREATE TABLE public.jobs ( deleted_at timestamp without time zone, deleted_by integer, aborted boolean DEFAULT false NOT NULL, - aborted_by integer + aborted_by integer, + timeout interval );