class Sources(base.Object):
def __aiter__(self):
- sources = self._get_sources("""
- SELECT
- *
- FROM
- sources
- WHERE
- deleted_at IS NULL
- ORDER BY
- created_at
- """,
+ stmt = (
+ sqlalchemy
+ .select(
+ Source,
+ )
+ .where(
+ Source.deleted_at == None,
+ )
+ .order_by(
+ Source.created_at.asc(),
+ )
)
- return aiter(sources)
+ return self.db.fetch(stmt)
async def get_by_slug(self, slug):
stmt = (
"""
Fetches any new commits from all sources
"""
- # Fetch all sources concurrently
- async with asyncio.TaskGroup() as tg:
- for source in self:
- tg.create_task(source.fetch())
+ # Fetch all sources
+ async for source in self:
+ await source.fetch()
+
+ # Commit!
+ await self.db.commit()
# Process any pending jobs
if run_jobs:
await self.git.show_attribute(revision, r"%aI"),
)
+ # Convert the date to UTC
+ date = date.astimezone(datetime.timezone.utc).replace(tzinfo=None)
+
# Create a new build group
- group = self.backend.builds.create_group()
+ builds = await self.backend.builds.create_group()
# Insert into the database
commit = await self.db.insert(
- Commit,
+ SourceCommit,
source = self,
revision = revision,
author = author,
subject = subject,
body = body,
date = date,
- group = group,
+ builds = builds,
)
# If we are processing the initial commit, we get a list of all files in the tree
raise RuntimeError("Unhandled status %s for file %s" % (status, filename))
# Create job
- commit._create_job(action, name)
+ await commit._create_job(action=action, name=name)
return commit
else:
await self.git.clone()
- with self.db.transaction():
- # Did we already import something?
- initial_commit = not self.revision
+ # Did we already import something?
+ initial_commit = not self.revision
- # Determine which commits there are to process
- revisions = await self.git.revisions(self.revision, self.branch)
+ # Determine which commits there are to process
+ revisions = await self.git.revisions(self.revision, self.branch)
- # Import all revisions
- for revision in revisions:
- # Import the commit
- await self._create_commit(revision, initial_commit=initial_commit)
+ # Import all revisions
+ for revision in revisions:
+ # Import the commit
+ await self._create_commit(revision, initial_commit=initial_commit)
- # Store the updated revision
- self._set_attribute("revision", revision)
+ # Store the updated revision
+ self.revision = revision
- # Only the first revision would the initial commit
- initial_commit = False
+ # Only the first revision would the initial commit
+ initial_commit = False
- # Store when we fetched
- self._set_attribute_now("last_fetched_at")
+ # Store when we fetched
+ self.last_fetched_at = sqlalchemy.func.current_timestamp()
class SourceCommit(database.Base, database.BackendMixin, database.SoftDeleteMixin):
# Jobs
- async def _create_job(self, action, name):
+ async def _create_job(self, **kwargs):
"""
Creates a new job
"""
- log.info("%s: %s: Created '%s' job for '%s'" \
- % (self.source, self.revision, action, name))
-
- job = await self.backend.sources._get_job("""
- INSERT INTO
- source_commit_jobs
- (
- commit_id,
- action,
- name
- )
- VALUES
- (
- %s, %s, %s
- )
- RETURNING *
- """, self.id, action, name,
+ job = await self.db.insert(
+ SourceJob,
+ commit = self,
+ **kwargs,
)
- # Append to cache
- self.jobs.add(job)
+ log.info("%s: %s: Created '%s' job for '%s'" \
+ % (self.source, self.revision, job.action, job.name))
return job