]> git.ipfire.org Git - pbs.git/commitdiff
sources: Process jobs in one large loop
authorMichael Tremer <michael.tremer@ipfire.org>
Mon, 22 May 2023 20:21:36 +0000 (20:21 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Mon, 22 May 2023 20:21:36 +0000 (20:21 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/buildservice/sources.py
src/database.sql
src/scripts/pakfire-build-service

index 089e9ad171b445a92917d570d39d4f261382a9cb..a1992b83f76981324ee738a3ac28167b96bc2856 100644 (file)
@@ -139,14 +139,50 @@ class Sources(base.Object):
                        for source in self:
                                tg.create_task(source.fetch())
 
-       # Process
+       # Run jobs
 
-       async def process(self):
+       def _get_jobs(self, query, *args, **kwargs):
+               return self.db.fetch_many(Job, query, *args, **kwargs)
+
+       def _get_job(self, query, *args, **kwargs):
+               return self.db.fetch_one(Job, query, *args, **kwargs)
+
+       @property
+       def pending_jobs(self):
                """
-                       Processes all sources
+                       Returns a generator of all pending jobs
                """
-               for source in self:
-                       await source.process()
+               return self._get_jobs("""
+                       SELECT
+                               source_commit_jobs.*
+                       FROM
+                               sources
+                       JOIN
+                               source_commits ON sources.id = source_commits.source_id
+                       JOIN
+                               source_commit_jobs ON source_commits.id = source_commit_jobs.commit_id
+                       WHERE
+                               sources.deleted_at IS NULL
+                       AND
+                               source_commits.finished_at IS NULL
+                       AND
+                               (
+                                       source_commit_jobs.finished_at IS NULL
+                               OR
+                                       source_commit_jobs.error IS NOT NULL
+                               )
+                       ORDER BY
+                               sources.created_at, source_commits.created_at
+                       """,
+               )
+
+       async def run_jobs(self):
+               """
+                       Runs all unfinished jobs of all sources
+               """
+               # Run jobs one after the other
+               for job in self.pending_jobs:
+                       await job.run()
 
 
 class Source(base.DataObject):
@@ -345,7 +381,7 @@ class Source(base.DataObject):
                                source_id = %s
                        ORDER BY
                                id DESC
-                       """, self.id,
+                       """, self.id, source=self,
                )
 
        def get_commit(self, revision):
@@ -395,29 +431,6 @@ class Source(base.DataObject):
                        # Store when we fetched
                        self._set_attribute_now("last_fetched_at")
 
-       async def process(self):
-               """
-                       Processes all commits that are not processed or had an error
-               """
-               commits = self.backend.sources._get_commits("""
-                       SELECT
-                               *
-                       FROM
-                               source_commits
-                       WHERE
-                               deleted_at IS NULL
-                       AND
-                               source_id = %s
-                       ORDER BY
-                               created_at ASC, id ASC
-                       """, self.id,
-               )
-
-               # XXX filter for non-finished commits
-
-               for commit in commits:
-                       await commit.process()
-
 
 class Commit(base.DataObject):
        table = "source_commits"
@@ -588,17 +601,11 @@ class Commit(base.DataObject):
 
        # Jobs
 
-       def _get_jobs(self, query, *args, **kwargs):
-               return self.db.fetch_many(Job, query, *args, commit=self, **kwargs)
-
-       def _get_job(self, query, *args, **kwargs):
-               return self.db.fetch_one(Job, query, *args, commit=self, **kwargs)
-
        def _create_job(self, action, name):
                """
                        Creates a new job
                """
-               job = self._get_job("""
+               job = self.backend.sources._get_job("""
                        INSERT INTO
                                source_commit_jobs
                        (
@@ -621,7 +628,7 @@ class Commit(base.DataObject):
 
        @lazy_property
        def jobs(self):
-               jobs = self._get_jobs("""
+               jobs = self.backend.sources._get_jobs("""
                        SELECT
                                *
                        FROM
@@ -633,9 +640,20 @@ class Commit(base.DataObject):
 
                return set(jobs)
 
-       async def process(self):
+       async def _job_has_finished(self, job):
+               """
+                       Called when a job has finished
+               """
+               # Find any pending/failed jobs
                for job in self.jobs:
-                       await job.run()
+                       if job.is_pending():
+                               return
+
+                       if job.has_failed():
+                               return
+
+               # If we get here, all jobs must have finished successfully
+               self._set_attribute_now("finished_at")
 
 
 class Job(base.DataObject):
@@ -665,6 +683,32 @@ class Job(base.DataObject):
        def name(self):
                return self.data.name
 
+       # Finished At
+
+       @property
+       def finished_at(self):
+               return self.data.finished_at
+
+       # Error
+
+       @property
+       def error(self):
+               return self.data.error
+
+       # Status
+
+       def is_pending(self):
+               return not self.has_finished()
+
+       def has_finished(self):
+               if self.finished_at:
+                       return True
+
+               return False
+
+       def has_failed(self):
+               return self.has_finished() and self.error
+
        # Run
 
        async def run(self):
@@ -685,6 +729,13 @@ class Job(base.DataObject):
                        else:
                                raise RuntimeError("Unhandled action: %s" % self.action)
 
+                       # Mark as finished
+                       self._set_attribute_now("finished_at")
+
+                       # Report that this job has finished if there is no error
+                       if not self.error:
+                               await self.commit._job_has_finished(job=self)
+
                # Launch all jobs (in the background)
                if build:
                        self.backend.run_task(self.backend.builds.launch, [build])
@@ -695,7 +746,7 @@ class Job(base.DataObject):
                """
                builds = self.backend.builds._get_builds("""
                        SELECT
-                               *
+                               builds.*
                        FROM
                                repositories
                        LEFT JOIN
@@ -733,9 +784,6 @@ class Job(base.DataObject):
                """
                upload = None
 
-               # Set as processed
-               self._set_attribute_now("processed_at")
-
                try:
                        # Create a new temporary directory and check out the requested revision
                        with tempfile.TemporaryDirectory() as path:
index 401de7616a8b8550307dcc3d7c5342a73bafec7b..504dea2ecbffbc6419564bfc6121b2f95afe3d62 100644 (file)
@@ -935,7 +935,7 @@ CREATE TABLE public.source_commit_jobs (
     commit_id integer NOT NULL,
     action text NOT NULL,
     name text NOT NULL,
-    processed_at timestamp without time zone,
+    finished_at timestamp without time zone,
     success boolean,
     error text
 );
@@ -977,7 +977,8 @@ CREATE TABLE public.source_commits (
     state text DEFAULT 'pending'::text NOT NULL,
     created_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL,
     deleted_at timestamp without time zone,
-    build_group_id integer
+    build_group_id integer,
+    finished_at timestamp without time zone
 );
 
 
@@ -1012,7 +1013,7 @@ CREATE TABLE public.sources (
     gitweb text,
     revision text NOT NULL,
     branch text NOT NULL,
-    last_check_at timestamp without time zone,
+    last_fetched_at timestamp without time zone,
     repo_id integer NOT NULL,
     created_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL,
     created_by integer NOT NULL,
index 883421b8fd4dab5f4652924671b0a56289915c5d..89f6b0b9f136f3a8f81ca086d24a64708f459dff 100644 (file)
@@ -50,7 +50,7 @@ class Cli(object):
 
                        # Sources
                        "sources:fetch"       : self.backend.sources.fetch,
-                       "sources:process"     : self.backend.sources.process,
+                       "sources:run-jobs"    : self.backend.sources.run_jobs,
 
                        # Sync
                        "sync"                : self.backend.sync,