From: Michael Tremer Date: Wed, 16 Apr 2025 09:38:53 +0000 (+0000) Subject: sources: Bring back fetching the Git repositories X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=328fb7eb9ba7c636f2a16d420243e9387c84960f;p=pbs.git sources: Bring back fetching the Git repositories Signed-off-by: Michael Tremer --- diff --git a/src/buildservice/__init__.py b/src/buildservice/__init__.py index fc266de7..b1f80bf3 100644 --- a/src/buildservice/__init__.py +++ b/src/buildservice/__init__.py @@ -106,7 +106,7 @@ class Backend(object): self.run_periodic_task(300, self.mirrors.check) # Regularly fetch sources - #self.run_periodic_task(300, self.sources.fetch) + self.run_periodic_task(300, self.sources.fetch) # Regularly check for new releases # XXX Disabled for now diff --git a/src/buildservice/sources.py b/src/buildservice/sources.py index 033a385b..117116d3 100644 --- a/src/buildservice/sources.py +++ b/src/buildservice/sources.py @@ -39,19 +39,20 @@ VALID_TAGS = ( class Sources(base.Object): def __aiter__(self): - sources = self._get_sources(""" - SELECT - * - FROM - sources - WHERE - deleted_at IS NULL - ORDER BY - created_at - """, + stmt = ( + sqlalchemy + .select( + Source, + ) + .where( + Source.deleted_at == None, + ) + .order_by( + Source.created_at.asc(), + ) ) - return aiter(sources) + return self.db.fetch(stmt) async def get_by_slug(self, slug): stmt = ( @@ -97,10 +98,12 @@ class Sources(base.Object): """ Fetches any new commits from all sources """ - # Fetch all sources concurrently - async with asyncio.TaskGroup() as tg: - for source in self: - tg.create_task(source.fetch()) + # Fetch all sources + async for source in self: + await source.fetch() + + # Commit! + await self.db.commit() # Process any pending jobs if run_jobs: @@ -264,12 +267,15 @@ class Source(database.Base, database.BackendMixin, database.SoftDeleteMixin): await self.git.show_attribute(revision, r"%aI"), ) + # Convert the date to UTC + date = date.astimezone(datetime.timezone.utc).replace(tzinfo=None) + # Create a new build group - group = self.backend.builds.create_group() + builds = await self.backend.builds.create_group() # Insert into the database commit = await self.db.insert( - Commit, + SourceCommit, source = self, revision = revision, author = author, @@ -277,7 +283,7 @@ class Source(database.Base, database.BackendMixin, database.SoftDeleteMixin): subject = subject, body = body, date = date, - group = group, + builds = builds, ) # If we are processing the initial commit, we get a list of all files in the tree @@ -312,7 +318,7 @@ class Source(database.Base, database.BackendMixin, database.SoftDeleteMixin): raise RuntimeError("Unhandled status %s for file %s" % (status, filename)) # Create job - commit._create_job(action, name) + await commit._create_job(action=action, name=name) return commit @@ -362,26 +368,25 @@ class Source(database.Base, database.BackendMixin, database.SoftDeleteMixin): else: await self.git.clone() - with self.db.transaction(): - # Did we already import something? - initial_commit = not self.revision + # Did we already import something? + initial_commit = not self.revision - # Determine which commits there are to process - revisions = await self.git.revisions(self.revision, self.branch) + # Determine which commits there are to process + revisions = await self.git.revisions(self.revision, self.branch) - # Import all revisions - for revision in revisions: - # Import the commit - await self._create_commit(revision, initial_commit=initial_commit) + # Import all revisions + for revision in revisions: + # Import the commit + await self._create_commit(revision, initial_commit=initial_commit) - # Store the updated revision - self._set_attribute("revision", revision) + # Store the updated revision + self.revision = revision - # Only the first revision would the initial commit - initial_commit = False + # Only the first revision would the initial commit + initial_commit = False - # Store when we fetched - self._set_attribute_now("last_fetched_at") + # Store when we fetched + self.last_fetched_at = sqlalchemy.func.current_timestamp() class SourceCommit(database.Base, database.BackendMixin, database.SoftDeleteMixin): @@ -550,31 +555,18 @@ class SourceCommit(database.Base, database.BackendMixin, database.SoftDeleteMixi # Jobs - async def _create_job(self, action, name): + async def _create_job(self, **kwargs): """ Creates a new job """ - log.info("%s: %s: Created '%s' job for '%s'" \ - % (self.source, self.revision, action, name)) - - job = await self.backend.sources._get_job(""" - INSERT INTO - source_commit_jobs - ( - commit_id, - action, - name - ) - VALUES - ( - %s, %s, %s - ) - RETURNING * - """, self.id, action, name, + job = await self.db.insert( + SourceJob, + commit = self, + **kwargs, ) - # Append to cache - self.jobs.add(job) + log.info("%s: %s: Created '%s' job for '%s'" \ + % (self.source, self.revision, job.action, job.name)) return job