# Cleanup regularly
self.run_periodic_task(3600, self.cleanup)
+ # Automatically abort any jobs that run for forever
+ self.run_periodic_task(60, self.jobs.abort)
+
def read_config(self, path):
c = configparser.ConfigParser()
#!/usr/bin/python
import asyncio
+import datetime
import itertools
import logging
import os
return list(builds)
async def create(self, repo, package, owner=None, group=None, test=False,
- disable_test_builds=False):
+ disable_test_builds=False, timeout=None):
"""
Creates a new build based on the given distribution and package
"""
if not package.path or not await self.backend.exists(package.path):
raise RuntimeError("Package %s does not exist (path = %s)" % (package, package.path))
+ # Set a default timeout
+ if timeout is None:
+ timeout = datetime.timedelta(hours=3)
+
build = self._get_build("""
INSERT INTO
builds
group.builds.append(build)
# Create all jobs
- build._create_jobs()
+ build._create_jobs(timeout=timeout)
if not build.is_test():
# Deprecate previous builds
return self._get_jobs("SELECT * FROM jobs \
WHERE build_id = %s", self.id)
- def _create_jobs(self):
+ def _create_jobs(self, **kwargs):
"""
Called after a build has been created and creates all jobs
"""
# Create the jobs
for arch in self.arches:
- self.backend.jobs.create(self, arch)
+ self.backend.jobs.create(self, arch, **kwargs)
async def _job_finished(self, job):
"""
def _get_jobs(self, query, *args, **kwargs):
return self.db.fetch_many(Job, query, *args, **kwargs)
- def create(self, build, arch, superseeds=None):
+ def create(self, build, arch, superseeds=None, timeout=None):
job = self._get_job("""
INSERT INTO
jobs
(
build_id,
- arch
+ arch,
+ timeout
)
VALUES
(
- %s,
- %s
+ %s, %s, %s
)
RETURNING *""",
build,
arch,
+ timeout,
# Populate cache
build=build,
# Request dispatch
await self.backend.jobs.queue.dispatch()
+ async def abort(self):
+ """
+ This is periodically called to abort any jobs that have crashed on the
+ builders for unknown reasons.
+ """
+ log.debug("Aborting timed-out jobs...")
+
+ jobs = self._get_jobs("""
+ SELECT
+ *
+ FROM
+ jobs
+ WHERE
+ deleted_at IS NULL
+ AND
+ finished_at IS NULL
+ AND
+ started_at IS NOT NULL
+ AND
+ timeout IS NOT NULL
+ AND
+ started_at + timeout < CURRENT_TIMESTAMP
+ """)
+
+ # Abort them all...
+ for job in jobs:
+ await job.abort()
+
class Queue(base.Object):
# Locked when the queue is being processed