self.run_periodic_task(3600, self.cleanup)
# Automatically abort any jobs that run for forever
- #self.run_periodic_task(60, self.jobs.abort)
+ self.run_periodic_task(60, self.jobs.abort)
def read_config(self, path):
c = configparser.ConfigParser()
"""
log.debug("Aborting timed-out jobs...")
- jobs = self._get_jobs("""
- SELECT
- *
- FROM
- jobs
- WHERE
- deleted_at IS NULL
- AND
- finished_at IS NULL
- AND
- started_at IS NOT NULL
- AND
- timeout IS NOT NULL
- AND
- started_at + timeout < CURRENT_TIMESTAMP
- """)
+ stmt = (
+ sqlalchemy
+ .select(
+ Job
+ )
+ .where(
+ # Don't care about deleted jobs
+ Job.deleted_at == None,
+
+ # Jobs must be running
+ Job.started_at != None,
+ Job.finished_at == None,
+
+ # Jobs must have a timeout
+ Job.timeout != None,
+
+ # The timeout must have passed
+ Job.started_at + Job.timeout < sqlalchemy.func.current_timestamp(),
+ )
+ )
# Abort them all...
- async for job in jobs:
+ async for job in self.db.fetch(stmt):
await job.abort()