for source in self:
tg.create_task(source.fetch())
- # Process
+ # Run jobs
- async def process(self):
+ def _get_jobs(self, query, *args, **kwargs):
+ return self.db.fetch_many(Job, query, *args, **kwargs)
+
+ def _get_job(self, query, *args, **kwargs):
+ return self.db.fetch_one(Job, query, *args, **kwargs)
+
+ @property
+ def pending_jobs(self):
"""
- Processes all sources
+ Returns a generator of all pending jobs
"""
- for source in self:
- await source.process()
+ return self._get_jobs("""
+ SELECT
+ source_commit_jobs.*
+ FROM
+ sources
+ JOIN
+ source_commits ON sources.id = source_commits.source_id
+ JOIN
+ source_commit_jobs ON source_commits.id = source_commit_jobs.commit_id
+ WHERE
+ sources.deleted_at IS NULL
+ AND
+ source_commits.finished_at IS NULL
+ AND
+ (
+ source_commit_jobs.finished_at IS NULL
+ OR
+ source_commit_jobs.error IS NOT NULL
+ )
+ ORDER BY
+ sources.created_at, source_commits.created_at
+ """,
+ )
+
+ async def run_jobs(self):
+ """
+ Runs all unfinished jobs of all sources
+ """
+ # Run jobs one after the other
+ for job in self.pending_jobs:
+ await job.run()
class Source(base.DataObject):
source_id = %s
ORDER BY
id DESC
- """, self.id,
+ """, self.id, source=self,
)
def get_commit(self, revision):
# Store when we fetched
self._set_attribute_now("last_fetched_at")
- async def process(self):
- """
- Processes all commits that are not processed or had an error
- """
- commits = self.backend.sources._get_commits("""
- SELECT
- *
- FROM
- source_commits
- WHERE
- deleted_at IS NULL
- AND
- source_id = %s
- ORDER BY
- created_at ASC, id ASC
- """, self.id,
- )
-
- # XXX filter for non-finished commits
-
- for commit in commits:
- await commit.process()
-
class Commit(base.DataObject):
table = "source_commits"
# Jobs
- def _get_jobs(self, query, *args, **kwargs):
- return self.db.fetch_many(Job, query, *args, commit=self, **kwargs)
-
- def _get_job(self, query, *args, **kwargs):
- return self.db.fetch_one(Job, query, *args, commit=self, **kwargs)
-
def _create_job(self, action, name):
"""
Creates a new job
"""
- job = self._get_job("""
+ job = self.backend.sources._get_job("""
INSERT INTO
source_commit_jobs
(
@lazy_property
def jobs(self):
- jobs = self._get_jobs("""
+ jobs = self.backend.sources._get_jobs("""
SELECT
*
FROM
return set(jobs)
- async def process(self):
+ async def _job_has_finished(self, job):
+ """
+ Called when a job has finished
+ """
+ # Find any pending/failed jobs
for job in self.jobs:
- await job.run()
+ if job.is_pending():
+ return
+
+ if job.has_failed():
+ return
+
+ # If we get here, all jobs must have finished successfully
+ self._set_attribute_now("finished_at")
class Job(base.DataObject):
def name(self):
return self.data.name
+ # Finished At
+
+ @property
+ def finished_at(self):
+ return self.data.finished_at
+
+ # Error
+
+ @property
+ def error(self):
+ return self.data.error
+
+ # Status
+
+ def is_pending(self):
+ return not self.has_finished()
+
+ def has_finished(self):
+ if self.finished_at:
+ return True
+
+ return False
+
+ def has_failed(self):
+ return self.has_finished() and self.error
+
# Run
async def run(self):
else:
raise RuntimeError("Unhandled action: %s" % self.action)
+ # Mark as finished
+ self._set_attribute_now("finished_at")
+
+ # Report that this job has finished if there is no error
+ if not self.error:
+ await self.commit._job_has_finished(job=self)
+
# Launch all jobs (in the background)
if build:
self.backend.run_task(self.backend.builds.launch, [build])
"""
builds = self.backend.builds._get_builds("""
SELECT
- *
+ builds.*
FROM
repositories
LEFT JOIN
"""
upload = None
- # Set as processed
- self._set_attribute_now("processed_at")
-
try:
# Create a new temporary directory and check out the requested revision
with tempfile.TemporaryDirectory() as path:
commit_id integer NOT NULL,
action text NOT NULL,
name text NOT NULL,
- processed_at timestamp without time zone,
+ finished_at timestamp without time zone,
success boolean,
error text
);
state text DEFAULT 'pending'::text NOT NULL,
created_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL,
deleted_at timestamp without time zone,
- build_group_id integer
+ build_group_id integer,
+ finished_at timestamp without time zone
);
gitweb text,
revision text NOT NULL,
branch text NOT NULL,
- last_check_at timestamp without time zone,
+ last_fetched_at timestamp without time zone,
repo_id integer NOT NULL,
created_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL,
created_by integer NOT NULL,