import fnmatch
import logging
import os
-import pakfire
-import pakfire.config
import re
-import shutil
-import subprocess
import tempfile
from . import base
)
class Sources(base.Object):
+ def __iter__(self):
+ sources = self._get_sources("""
+ SELECT
+ *
+ FROM
+ sources
+ WHERE
+ deleted_at IS NULL
+ ORDER BY
+ created_at
+ """,
+ )
+
+ return iter(sources)
+
def _get_sources(self, query, *args, **kwargs):
return self.db.fetch_many(Source, query, *args, **kwargs)
""", id,
)
+ # Check
+
+ async def check(self):
+ for source in self:
+ await source.check()
+
+ # Process
+
+ async def process(self):
+ """
+ Processes all sources
+ """
+ for source in self:
+ await source.process()
+
class Source(base.DataObject):
table = "sources"
def __str__(self):
return self.name
+ @lazy_property
+ def git(self):
+ # Setup the Git repository
+ return Git(self.backend, self.path, self.url, self.branch)
+
# Name
def get_name(self):
def slug(self):
return self.data.slug
+ # Distro
+
@property
def distro(self):
return self.repo.distro
source_id = %s
AND
revision = %s
- """, self.id, revision,
+ """, self.id, revision, source=self,
)
- # Populate cache
- if commit:
- commit.source = self
-
return commit
+ # Check
+
+ @property
+ def last_check_at(self):
+ return self.data.last_check_at
+
# Operations
- async def fetch(self):
+ async def check(self):
"""
Fetches any new commits from this source
"""
log.debug("Fetching %s" % self)
- # Setup the Git repository
- git = Git(self.backend, self.path, self.url, self.branch)
-
# Update or clone the repository
- if git.is_cloned():
- await git.fetch()
+ if self.git.is_cloned():
+ await self.git.fetch()
else:
- await git.clone()
+ await self.git.clone()
# Determine which commits there are to process
- revisions = [
- await git.show_ref(self.branch),
- ]
-
- # Process all revisions one after the other
- for revision in revisions:
- with self.db.transaction():
- await self._process_revision(git, revision)
-
- async def _process_revision(self, git, revision):
- """
- Processes a certain revision
- """
- # Create the commit metadata
- commit = await self._import_commit(git, revision)
-
- # Find changed files
- changed_files = await git.changed_files(revision, filter="*/*.nm")
-
- # Exit if there have not been any changes
- if not changed_files:
- log.debug("%s has no changes")
- return
+ revisions = await self.git.revisions(self.revision, self.branch)
- deleted_packages = []
- dist_files = []
+ with self.db.transaction():
+ for revision in revisions:
+ # Import the commit
+ await self._create_commit(revision)
- # Run through all changed files
- for status, filename in changed_files:
- # Collect the names of all deleted packages
- if status == "D":
- deleted_packages.append(
- self._filename_to_package(filename),
- )
-
- # For any other changed makefiles, we collect their paths
- else:
- dist_files.append(filename)
-
- # Deprecate any deleted packages
- if deleted_packages:
- self._deprecate_packages(deleted_packages)
-
- # Create builds from all other changed files
- if dist_files:
- await self._dist_files(git, revision, dist_files)
-
- def _filename_to_package(self, filename):
- """
- Maps a filename to a package name
- """
- name = os.path.dirname(filename)
-
- # Check that the file part matches
- if not filename.endswith("/%s.nm" % name):
- raise ValueError("Invalid package name")
+ # Store the updated revision
+ self._set_attribute("revision", revision)
- return name
+ # Store when we checked
+ self._set_attribute_now("last_check_at")
- def _deprecate_packages(self, names):
+ async def process(self):
"""
- Called to deprecate any packages that match any of the names
+ Processes all commits that are not processed or had an error
"""
- builds = self.backend.builds._get_builds("""
+ commits = self.backend.sources._get_commits("""
SELECT
*
FROM
- builds
- LEFT JOIN
- packages ON builds.pkg_id = packages.id
+ source_commits
WHERE
- builds.deleted_at IS NULL
- AND
- builds.deprecated_by IS NULL
- AND
- builds.build_repo_id = %s
+ deleted_at IS NULL
AND
- packages.name = ANY(%s)
- """, self.repo, names,
+ source_id = %s
+ ORDER BY
+ created_at ASC, id ASC
+ """, self.id,
)
- # Deprecate all matching builds
- for build in builds:
- build.deprecate()
-
- async def _dist_files(self, git, revision, files):
- """
- Runs "pakfire dist" on all given files at the given revision
- """
- with tempfile.TemporaryDirectory() as path:
- # Checkout the revision into a new temporary directory
- await git.checkout(revision, path)
-
- uploads = []
-
- # Launch a Pakfire instance with the repository configuration
- try:
- with self.repo.pakfire() as p:
- # Walk through all files one by one
- for file in files:
- # Run dist()
- file = p.dist(file)
+ # XXX filter for non-finished commits
- # Upload the file
- uploads.append(
- await upload.create_from_file(file),
- )
-
- # Remove the source file
- os.unlink(file)
-
- finally:
- # Delete any uploads
- await asyncio.gather(
- *(upload.delete() for upload in uploads)
- )
+ for commit in commits:
+ await commit.process()
class Commit(base.DataObject):
table = "source_commits"
+ def __str__(self):
+ return "%s - %s" % (self.revision[:8], self.subject)
+
+ # Revision
+
@property
def revision(self):
return self.data.revision
+ # Source
+
@lazy_property
def source(self):
return self.backend.sources.get_by_id(self.data.source_id)
+ # State
+
def set_state(self, state):
self._set_attribute("state", state)
state = property(lambda s: s.data.state, set_state)
+ # Author
+
@lazy_property
def author(self):
return self.backend.users.get_by_email(self.data.author) or self.data.author
+ # Committer
+
@lazy_property
def committer(self):
return self.backend.users.get_by_email(self.data.committer) or self.data.committer
+ # Date
+
+ @property
+ def date(self):
+ return self.data.date
+
+ # Subject
+
@property
def subject(self):
return self.data.subject.strip()
+ # Body
+
@property
def body(self):
return self.data.body.strip()
raise ValueError("Unknown tag: %s" % tag)
# Compile regex
- r = re.compile("^%s:? (.*)$" % tag, re.IGNORECASE)
+ r = re.compile(r"^%s:? (.*)$" % tag, re.IGNORECASE)
values = []
for line in self.body.splitlines():
"""
return self.get_tag("Fixes")
- @property
- def date(self):
- return self.data.date
+ # Builds
+
+ @lazy_property
+ def builds(self):
+ if self.data.build_group_id:
+ return self.backend.builds.groups.get_by_id(self.data.build_group_id)
+
+ # Jobs
+
+ def _get_jobs(self, query, *args, **kwargs):
+ return self.db.fetch_many(Job, query, *args, commit=self, **kwargs)
+
+ def _get_job(self, query, *args, **kwargs):
+ return self.db.fetch_one(Job, query, *args, commit=self, **kwargs)
+
+ def _create_job(self, action, name):
+ """
+ Creates a new job
+ """
+ job = self._get_job("""
+ INSERT INTO
+ source_commit_jobs
+ (
+ commit_id,
+ action,
+ name
+ )
+ VALUES
+ (
+ %s, %s, %s
+ )
+ RETURNING *
+ """, self.id, action, name,
+ )
+
+ # Append to cache
+ self.jobs.add(job)
+
+ return job
+
+ @lazy_property
+ def jobs(self):
+ jobs = self._get_jobs("""
+ SELECT
+ *
+ FROM
+ source_commit_jobs
+ WHERE
+ commit_id = %s
+ """, self.id,
+ )
+
+ return set(jobs)
+
+ async def process(self):
+ for job in self.jobs:
+ await job.run()
+
+
+class Job(base.DataObject):
+ table = "source_commit_jobs"
+
+ # Source
+
+ @lazy_property
+ def source(self):
+ return self.commit.source
+
+ # Commit
@lazy_property
- def packages(self):
- return self.backend.packages._get_packages("SELECT * FROM packages \
- WHERE commit_id = %s", self.id)
+ def commit(self):
+ return self.backend.sources.get_commit_by_id(self.data.commit_id)
+
+ # Action
+
+ @property
+ def action(self):
+ return self.data.action
+
+ # Name
+
+ @property
+ def name(self):
+ return self.data.name
+
+ # Run
+
+ async def run(self):
+ """
+ Runs the job
+ """
+ build = None
+
+ log.debug("Running %s..." % self)
+ with self.db.transaction():
+ if self.action == "dist":
+ await self._run_dist()
+
+ elif self.action == "deprecate":
+ await self._run_deprecate()
+
+ else:
+ raise RuntimeError("Unhandled action: %s" % self.action)
+
+ # Launch all jobs (in the background)
+ if build:
+ self.backend.run_task(self.backend.builds.launch, [build])
+
+ async def _run_deprecate(self):
+ """
+ Called to deprecate any builds that match any of the name
+ """
+ builds = self.backend.builds._get_builds("""
+ SELECT
+ *
+ FROM
+ repositories
+ LEFT JOIN
+ repository_builds ON repositories.id = repository_builds.repo_id
+ LEFT JOIN
+ builds ON repository_builds.build_id = builds.id
+ LEFT JOIN
+ packages ON builds.pkg_id = packages.id
+ WHERE
+ repositories.deleted_at IS NULL
+ AND
+ repositories.distro_id = %s
+ AND
+ repositories.owner_id IS NULL
+ AND
+ repository_builds.removed_at IS NULL
+ AND
+ builds.deleted_at IS NULL
+ AND
+ builds.deprecated_by IS NULL
+ AND
+ packages.deleted_at IS NULL
+ AND
+ packages.name = %s
+ """, self.source.distro, self.name,
+ )
+
+ # Deprecate all matching builds
+ for build in builds:
+ build.deprecate()
+
+ async def _run_dist(self):
+ """
+ Called to run dist on the given package
+ """
+ upload = None
+
+ # Set as processed
+ self._set_attribute_now("processed_at")
+
+ try:
+ # Create a new temporary directory and check out the requested revision
+ with tempfile.TemporaryDirectory() as path:
+ await self.source.git.checkout(self.commit.revision, path)
+
+ # Create a path for the source packages
+ target = os.path.join(path, ".target")
+
+ # Find the absolute path of the makefile
+ makefile = os.path.join(path, "%s/%s.nm" % (self.name, self.name))
+
+ # Launch a Pakfire instance with the repository configuration
+ with self.source.repo.pakfire() as p:
+ log.debug("Running dist for %s..." % makefile)
+
+ # Run dist()
+ file = p.dist(makefile, target)
+
+ # Upload the file
+ upload = await self.backend.uploads.create_from_local(file)
+
+ # Create the package
+ # XXX reference the commit here?
+ package = await self.backend.packages.create(upload)
+
+ # Create a new build (without updating the repository immediately)
+ build = await self.backend.builds.create(
+ self.source.repo, package, group=self.commit.builds, update=False)
+
+ # XXX add watchers
+
+ # Return the build
+ return build
+
+ # Catch any exceptions and log them
+ except Exception as e:
+ log.error("Error running %s: " % self, exc_info=True)
+
+ # Store the error
+ self._set_attribute("error", "%s" % e)
+
+ # Always delete the upload
+ finally:
+ if upload:
+ await upload.delete()
class Git(object):
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.path)
+ # Locks so that we will access a repository once at a time
+ __locks = {}
+
+ @property
+ def lock(self):
+ try:
+ lock = self.__locks[self.path]
+ except KeyError:
+ lock = self.__locks[self.path] = asyncio.Lock()
+
+ return lock
+
async def command(self, *args, **kwargs):
"""
Executes a Git command
if self.is_cloned():
cwd = self.path
- return await self.backend.command("git", *args, cwd=cwd, **kwargs)
+ async with self.lock:
+ return await self.backend.command("git", *args, cwd=cwd, **kwargs)
def is_cloned(self):
"""
"""
Fetches any changes
"""
- await self.command("fetch", self.url, self.branch)
+ await self.command("fetch", self.url, "%s:%s" % (self.branch, self.branch))
async def show_attribute(self, revision, format):
return await self.command(
"""
return await self.command("show-ref", "--hash", branch, return_output=True)
+ async def revisions(self, start, end):
+ """
+ Returns a list with all revisions between start and end
+ """
+ if start is None:
+ return await self.show_ref(end)
+
+ # Fetch the hashes of all revisions
+ revisions = await self.command("log", "--format=%H",
+ "%s..%s" % (start, end), return_output=True)
+
+ # Return them in reverse order from oldest to newest
+ return reversed(revisions.splitlines())
+
async def changed_files(self, revision, filter=None):
"""
Returns a list of files that has been changed
-- *not* creating schema, since initdb creates it
---
--- Name: on_update_current_timestamp_sources(); Type: FUNCTION; Schema: public; Owner: -
---
-
-CREATE FUNCTION public.on_update_current_timestamp_sources() RETURNS trigger
- LANGUAGE plpgsql
- AS $$
-BEGIN
- NEW.updated = now();
- RETURN NEW;
-END;
-$$;
-
-
SET default_tablespace = '';
SET default_table_access_method = heap;
);
+--
+-- Name: source_commit_jobs; Type: TABLE; Schema: public; Owner: -
+--
+
+CREATE TABLE public.source_commit_jobs (
+ id integer NOT NULL,
+ commit_id integer NOT NULL,
+ action text NOT NULL,
+ name text NOT NULL,
+ processed_at timestamp without time zone,
+ success boolean,
+ error text
+);
+
+
+--
+-- Name: source_commit_jobs_id_seq; Type: SEQUENCE; Schema: public; Owner: -
+--
+
+CREATE SEQUENCE public.source_commit_jobs_id_seq
+ AS integer
+ START WITH 1
+ INCREMENT BY 1
+ NO MINVALUE
+ NO MAXVALUE
+ CACHE 1;
+
+
+--
+-- Name: source_commit_jobs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
+--
+
+ALTER SEQUENCE public.source_commit_jobs_id_seq OWNED BY public.source_commit_jobs.id;
+
+
--
-- Name: source_commits; Type: TABLE; Schema: public; Owner: -
--
body text NOT NULL,
date timestamp without time zone NOT NULL,
state text DEFAULT 'pending'::text NOT NULL,
- imported_at timestamp without time zone DEFAULT now() NOT NULL
+ created_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL,
+ deleted_at timestamp without time zone,
+ build_group_id integer
);
+--
+-- Name: source_commits_id_seq; Type: SEQUENCE; Schema: public; Owner: -
+--
+
+CREATE SEQUENCE public.source_commits_id_seq
+ START WITH 1
+ INCREMENT BY 1
+ NO MINVALUE
+ NO MAXVALUE
+ CACHE 1;
+
+
+--
+-- Name: source_commits_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
+--
+
+ALTER SEQUENCE public.source_commits_id_seq OWNED BY public.source_commits.id;
+
+
--
-- Name: sources; Type: TABLE; Schema: public; Owner: -
--
gitweb text,
revision text NOT NULL,
branch text NOT NULL,
- last_updated_at timestamp without time zone,
+ last_check_at timestamp without time zone,
repo_id integer NOT NULL,
created_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL,
created_by integer NOT NULL,
);
---
--- Name: sources_commits_id_seq; Type: SEQUENCE; Schema: public; Owner: -
---
-
-CREATE SEQUENCE public.sources_commits_id_seq
- START WITH 1
- INCREMENT BY 1
- NO MINVALUE
- NO MAXVALUE
- CACHE 1;
-
-
---
--- Name: sources_commits_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
---
-
-ALTER SEQUENCE public.sources_commits_id_seq OWNED BY public.source_commits.id;
-
-
--
-- Name: sources_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
ALTER TABLE ONLY public.sessions ALTER COLUMN id SET DEFAULT nextval('public.sessions_id_seq'::regclass);
+--
+-- Name: source_commit_jobs id; Type: DEFAULT; Schema: public; Owner: -
+--
+
+ALTER TABLE ONLY public.source_commit_jobs ALTER COLUMN id SET DEFAULT nextval('public.source_commit_jobs_id_seq'::regclass);
+
+
--
-- Name: source_commits id; Type: DEFAULT; Schema: public; Owner: -
--
-ALTER TABLE ONLY public.source_commits ALTER COLUMN id SET DEFAULT nextval('public.sources_commits_id_seq'::regclass);
+ALTER TABLE ONLY public.source_commits ALTER COLUMN id SET DEFAULT nextval('public.source_commits_id_seq'::regclass);
--
ADD CONSTRAINT sessions_session_id_key UNIQUE (session_id);
+--
+-- Name: source_commit_jobs source_commit_jobs_pkey; Type: CONSTRAINT; Schema: public; Owner: -
+--
+
+ALTER TABLE ONLY public.source_commit_jobs
+ ADD CONSTRAINT source_commit_jobs_pkey PRIMARY KEY (id);
+
+
--
-- Name: source_commits source_commits_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
CREATE UNIQUE INDEX repository_builds_unique ON public.repository_builds USING btree (repo_id, build_id) WHERE (removed_at IS NULL);
+--
+-- Name: source_commit_jobs_commit_id; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX source_commit_jobs_commit_id ON public.source_commit_jobs USING btree (commit_id);
+
+
--
-- Name: source_commits_unique; Type: INDEX; Schema: public; Owner: -
--
CREATE INDEX user_push_subscriptions_user_id ON public.user_push_subscriptions USING btree (user_id) WHERE (deleted_at IS NULL);
---
--- Name: sources on_update_current_timestamp; Type: TRIGGER; Schema: public; Owner: -
---
-
-CREATE TRIGGER on_update_current_timestamp BEFORE UPDATE ON public.sources FOR EACH ROW EXECUTE FUNCTION public.on_update_current_timestamp_sources();
-
-
--
-- Name: build_groups build_groups_created_by; Type: FK CONSTRAINT; Schema: public; Owner: -
--
ADD CONSTRAINT sessions_user_id FOREIGN KEY (user_id) REFERENCES public.users(id);
+--
+-- Name: source_commit_jobs source_commit_jobs_commit_id; Type: FK CONSTRAINT; Schema: public; Owner: -
+--
+
+ALTER TABLE ONLY public.source_commit_jobs
+ ADD CONSTRAINT source_commit_jobs_commit_id FOREIGN KEY (commit_id) REFERENCES public.source_commits(id);
+
+
+--
+-- Name: source_commits source_commits_build_group_id; Type: FK CONSTRAINT; Schema: public; Owner: -
+--
+
+ALTER TABLE ONLY public.source_commits
+ ADD CONSTRAINT source_commits_build_group_id FOREIGN KEY (build_group_id) REFERENCES public.build_groups(id);
+
+
--
-- Name: source_commits sources_commits_source_id; Type: FK CONSTRAINT; Schema: public; Owner: -
--