From: Michael Tremer Date: Mon, 22 May 2023 20:21:36 +0000 (+0000) Subject: sources: Process jobs in one large loop X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=1da6e9fd6a7850df5e67ba5c879fc3d3462bfa28;p=pbs.git sources: Process jobs in one large loop Signed-off-by: Michael Tremer --- diff --git a/src/buildservice/sources.py b/src/buildservice/sources.py index 089e9ad1..a1992b83 100644 --- a/src/buildservice/sources.py +++ b/src/buildservice/sources.py @@ -139,14 +139,50 @@ class Sources(base.Object): for source in self: tg.create_task(source.fetch()) - # Process + # Run jobs - async def process(self): + def _get_jobs(self, query, *args, **kwargs): + return self.db.fetch_many(Job, query, *args, **kwargs) + + def _get_job(self, query, *args, **kwargs): + return self.db.fetch_one(Job, query, *args, **kwargs) + + @property + def pending_jobs(self): """ - Processes all sources + Returns a generator of all pending jobs """ - for source in self: - await source.process() + return self._get_jobs(""" + SELECT + source_commit_jobs.* + FROM + sources + JOIN + source_commits ON sources.id = source_commits.source_id + JOIN + source_commit_jobs ON source_commits.id = source_commit_jobs.commit_id + WHERE + sources.deleted_at IS NULL + AND + source_commits.finished_at IS NULL + AND + ( + source_commit_jobs.finished_at IS NULL + OR + source_commit_jobs.error IS NOT NULL + ) + ORDER BY + sources.created_at, source_commits.created_at + """, + ) + + async def run_jobs(self): + """ + Runs all unfinished jobs of all sources + """ + # Run jobs one after the other + for job in self.pending_jobs: + await job.run() class Source(base.DataObject): @@ -345,7 +381,7 @@ class Source(base.DataObject): source_id = %s ORDER BY id DESC - """, self.id, + """, self.id, source=self, ) def get_commit(self, revision): @@ -395,29 +431,6 @@ class Source(base.DataObject): # Store when we fetched self._set_attribute_now("last_fetched_at") - async def process(self): - """ - Processes all commits that are not processed or had an error - """ - commits = self.backend.sources._get_commits(""" - SELECT - * - FROM - source_commits - WHERE - deleted_at IS NULL - AND - source_id = %s - ORDER BY - created_at ASC, id ASC - """, self.id, - ) - - # XXX filter for non-finished commits - - for commit in commits: - await commit.process() - class Commit(base.DataObject): table = "source_commits" @@ -588,17 +601,11 @@ class Commit(base.DataObject): # Jobs - def _get_jobs(self, query, *args, **kwargs): - return self.db.fetch_many(Job, query, *args, commit=self, **kwargs) - - def _get_job(self, query, *args, **kwargs): - return self.db.fetch_one(Job, query, *args, commit=self, **kwargs) - def _create_job(self, action, name): """ Creates a new job """ - job = self._get_job(""" + job = self.backend.sources._get_job(""" INSERT INTO source_commit_jobs ( @@ -621,7 +628,7 @@ class Commit(base.DataObject): @lazy_property def jobs(self): - jobs = self._get_jobs(""" + jobs = self.backend.sources._get_jobs(""" SELECT * FROM @@ -633,9 +640,20 @@ class Commit(base.DataObject): return set(jobs) - async def process(self): + async def _job_has_finished(self, job): + """ + Called when a job has finished + """ + # Find any pending/failed jobs for job in self.jobs: - await job.run() + if job.is_pending(): + return + + if job.has_failed(): + return + + # If we get here, all jobs must have finished successfully + self._set_attribute_now("finished_at") class Job(base.DataObject): @@ -665,6 +683,32 @@ class Job(base.DataObject): def name(self): return self.data.name + # Finished At + + @property + def finished_at(self): + return self.data.finished_at + + # Error + + @property + def error(self): + return self.data.error + + # Status + + def is_pending(self): + return not self.has_finished() + + def has_finished(self): + if self.finished_at: + return True + + return False + + def has_failed(self): + return self.has_finished() and self.error + # Run async def run(self): @@ -685,6 +729,13 @@ class Job(base.DataObject): else: raise RuntimeError("Unhandled action: %s" % self.action) + # Mark as finished + self._set_attribute_now("finished_at") + + # Report that this job has finished if there is no error + if not self.error: + await self.commit._job_has_finished(job=self) + # Launch all jobs (in the background) if build: self.backend.run_task(self.backend.builds.launch, [build]) @@ -695,7 +746,7 @@ class Job(base.DataObject): """ builds = self.backend.builds._get_builds(""" SELECT - * + builds.* FROM repositories LEFT JOIN @@ -733,9 +784,6 @@ class Job(base.DataObject): """ upload = None - # Set as processed - self._set_attribute_now("processed_at") - try: # Create a new temporary directory and check out the requested revision with tempfile.TemporaryDirectory() as path: diff --git a/src/database.sql b/src/database.sql index 401de761..504dea2e 100644 --- a/src/database.sql +++ b/src/database.sql @@ -935,7 +935,7 @@ CREATE TABLE public.source_commit_jobs ( commit_id integer NOT NULL, action text NOT NULL, name text NOT NULL, - processed_at timestamp without time zone, + finished_at timestamp without time zone, success boolean, error text ); @@ -977,7 +977,8 @@ CREATE TABLE public.source_commits ( state text DEFAULT 'pending'::text NOT NULL, created_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL, deleted_at timestamp without time zone, - build_group_id integer + build_group_id integer, + finished_at timestamp without time zone ); @@ -1012,7 +1013,7 @@ CREATE TABLE public.sources ( gitweb text, revision text NOT NULL, branch text NOT NULL, - last_check_at timestamp without time zone, + last_fetched_at timestamp without time zone, repo_id integer NOT NULL, created_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL, created_by integer NOT NULL, diff --git a/src/scripts/pakfire-build-service b/src/scripts/pakfire-build-service index 883421b8..89f6b0b9 100644 --- a/src/scripts/pakfire-build-service +++ b/src/scripts/pakfire-build-service @@ -50,7 +50,7 @@ class Cli(object): # Sources "sources:fetch" : self.backend.sources.fetch, - "sources:process" : self.backend.sources.process, + "sources:run-jobs" : self.backend.sources.run_jobs, # Sync "sync" : self.backend.sync,