From: Michael Tremer Date: Mon, 22 May 2023 19:18:35 +0000 (+0000) Subject: sources: WIP of a refactoring X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=07c337e76ba4b1a70654be4e1e8dce549a71d20a;p=pbs.git sources: WIP of a refactoring This introduces source commits jobs since we split importing commits and the the individual changes in it... Signed-off-by: Michael Tremer --- diff --git a/src/buildservice/builds.py b/src/buildservice/builds.py index 038d45fd..c64221d8 100644 --- a/src/buildservice/builds.py +++ b/src/buildservice/builds.py @@ -177,7 +177,7 @@ class Builds(base.Object): return list(builds) async def create(self, repo, package, owner=None, group=None, test=False, - disable_test_builds=False): + disable_test_builds=False, update=True): """ Creates a new build based on the given distribution and package """ @@ -225,7 +225,7 @@ class Builds(base.Object): build._add_watchers() # Add the build into its repository - await repo.add_build(build, user=owner) + await repo.add_build(build, user=owner, update=update) return build diff --git a/src/buildservice/jobs.py b/src/buildservice/jobs.py index bbd4334b..c4c251fe 100644 --- a/src/buildservice/jobs.py +++ b/src/buildservice/jobs.py @@ -101,15 +101,20 @@ class Jobs(base.Object): if not jobs: return - # Perform dependency checks for all jobs - results = await asyncio.gather( - *(job.depcheck() for job in jobs), - ) + tasks = [] - # Try to dispatch any jobs afterwards - if any(results): - await self.backend.jobs.queue.dispatch() + async with asyncio.TaskGroup() as tg: + for job in jobs: + task = tg.create_task(job.depcheck()) + tasks.append(task) + # Try to dispatch any jobs afterwards if at least one task returned True + for task in tasks: + if not task.result(): + continue + + await self.backend.jobs.queue.dispatch() + break class Queue(base.Object): def init(self): diff --git a/src/buildservice/repository.py b/src/buildservice/repository.py index e9037dc6..ecd84481 100644 --- a/src/buildservice/repository.py +++ b/src/buildservice/repository.py @@ -135,12 +135,12 @@ class Repositories(base.Object): return self._get_repository("SELECT * FROM repositories \ WHERE id = %s", repo_id) - async def fetch_sources(self): + async def check_sources(self): """ - Fetches sources for all repositories + Checks sources for all repositories """ for repo in self: - await repo.fetch_sources() + await repo.check_sources() async def write(self): """ @@ -517,15 +517,15 @@ class Repository(base.DataObject): repo=self, ) - async def fetch_sources(self): + async def check_sources(self): """ Fetches all sources for this repository """ log.debug("Fetching all sources for %s" % self) - await asyncio.gather( - *(source.fetch() for source in self.sources), - ) + # Do not do this concurrently + for source in self.sources: + await source.check() # Builds @@ -851,7 +851,7 @@ class Repository(base.DataObject): # Run the update process at the next convenient moment self.backend.run_task(self.update) - async def update(self): + async def update(self, skip_depcheck=False): """ Called to perform an update of this repository """ @@ -859,7 +859,8 @@ class Repository(base.DataObject): await self.write() # Relaunch any pending jobs - await self.relaunch_pending_jobs() + if not skip_depcheck: + await self.relaunch_pending_jobs() async def relaunch_pending_jobs(self): # Perform this on this repository diff --git a/src/buildservice/sources.py b/src/buildservice/sources.py index 011aea9e..5617853a 100644 --- a/src/buildservice/sources.py +++ b/src/buildservice/sources.py @@ -5,11 +5,7 @@ import datetime import fnmatch import logging import os -import pakfire -import pakfire.config import re -import shutil -import subprocess import tempfile from . import base @@ -36,6 +32,21 @@ VALID_TAGS = ( ) class Sources(base.Object): + def __iter__(self): + sources = self._get_sources(""" + SELECT + * + FROM + sources + WHERE + deleted_at IS NULL + ORDER BY + created_at + """, + ) + + return iter(sources) + def _get_sources(self, query, *args, **kwargs): return self.db.fetch_many(Source, query, *args, **kwargs) @@ -117,6 +128,21 @@ class Sources(base.Object): """, id, ) + # Check + + async def check(self): + for source in self: + await source.check() + + # Process + + async def process(self): + """ + Processes all sources + """ + for source in self: + await source.process() + class Source(base.DataObject): table = "sources" @@ -127,6 +153,11 @@ class Source(base.DataObject): def __str__(self): return self.name + @lazy_property + def git(self): + # Setup the Git repository + return Git(self.backend, self.path, self.url, self.branch) + # Name def get_name(self): @@ -143,6 +174,8 @@ class Source(base.DataObject): def slug(self): return self.data.slug + # Distro + @property def distro(self): return self.repo.distro @@ -320,179 +353,120 @@ class Source(base.DataObject): source_id = %s AND revision = %s - """, self.id, revision, + """, self.id, revision, source=self, ) - # Populate cache - if commit: - commit.source = self - return commit + # Check + + @property + def last_check_at(self): + return self.data.last_check_at + # Operations - async def fetch(self): + async def check(self): """ Fetches any new commits from this source """ log.debug("Fetching %s" % self) - # Setup the Git repository - git = Git(self.backend, self.path, self.url, self.branch) - # Update or clone the repository - if git.is_cloned(): - await git.fetch() + if self.git.is_cloned(): + await self.git.fetch() else: - await git.clone() + await self.git.clone() # Determine which commits there are to process - revisions = [ - await git.show_ref(self.branch), - ] - - # Process all revisions one after the other - for revision in revisions: - with self.db.transaction(): - await self._process_revision(git, revision) - - async def _process_revision(self, git, revision): - """ - Processes a certain revision - """ - # Create the commit metadata - commit = await self._import_commit(git, revision) - - # Find changed files - changed_files = await git.changed_files(revision, filter="*/*.nm") - - # Exit if there have not been any changes - if not changed_files: - log.debug("%s has no changes") - return + revisions = await self.git.revisions(self.revision, self.branch) - deleted_packages = [] - dist_files = [] + with self.db.transaction(): + for revision in revisions: + # Import the commit + await self._create_commit(revision) - # Run through all changed files - for status, filename in changed_files: - # Collect the names of all deleted packages - if status == "D": - deleted_packages.append( - self._filename_to_package(filename), - ) - - # For any other changed makefiles, we collect their paths - else: - dist_files.append(filename) - - # Deprecate any deleted packages - if deleted_packages: - self._deprecate_packages(deleted_packages) - - # Create builds from all other changed files - if dist_files: - await self._dist_files(git, revision, dist_files) - - def _filename_to_package(self, filename): - """ - Maps a filename to a package name - """ - name = os.path.dirname(filename) - - # Check that the file part matches - if not filename.endswith("/%s.nm" % name): - raise ValueError("Invalid package name") + # Store the updated revision + self._set_attribute("revision", revision) - return name + # Store when we checked + self._set_attribute_now("last_check_at") - def _deprecate_packages(self, names): + async def process(self): """ - Called to deprecate any packages that match any of the names + Processes all commits that are not processed or had an error """ - builds = self.backend.builds._get_builds(""" + commits = self.backend.sources._get_commits(""" SELECT * FROM - builds - LEFT JOIN - packages ON builds.pkg_id = packages.id + source_commits WHERE - builds.deleted_at IS NULL - AND - builds.deprecated_by IS NULL - AND - builds.build_repo_id = %s + deleted_at IS NULL AND - packages.name = ANY(%s) - """, self.repo, names, + source_id = %s + ORDER BY + created_at ASC, id ASC + """, self.id, ) - # Deprecate all matching builds - for build in builds: - build.deprecate() - - async def _dist_files(self, git, revision, files): - """ - Runs "pakfire dist" on all given files at the given revision - """ - with tempfile.TemporaryDirectory() as path: - # Checkout the revision into a new temporary directory - await git.checkout(revision, path) - - uploads = [] - - # Launch a Pakfire instance with the repository configuration - try: - with self.repo.pakfire() as p: - # Walk through all files one by one - for file in files: - # Run dist() - file = p.dist(file) + # XXX filter for non-finished commits - # Upload the file - uploads.append( - await upload.create_from_file(file), - ) - - # Remove the source file - os.unlink(file) - - finally: - # Delete any uploads - await asyncio.gather( - *(upload.delete() for upload in uploads) - ) + for commit in commits: + await commit.process() class Commit(base.DataObject): table = "source_commits" + def __str__(self): + return "%s - %s" % (self.revision[:8], self.subject) + + # Revision + @property def revision(self): return self.data.revision + # Source + @lazy_property def source(self): return self.backend.sources.get_by_id(self.data.source_id) + # State + def set_state(self, state): self._set_attribute("state", state) state = property(lambda s: s.data.state, set_state) + # Author + @lazy_property def author(self): return self.backend.users.get_by_email(self.data.author) or self.data.author + # Committer + @lazy_property def committer(self): return self.backend.users.get_by_email(self.data.committer) or self.data.committer + # Date + + @property + def date(self): + return self.data.date + + # Subject + @property def subject(self): return self.data.subject.strip() + # Body + @property def body(self): return self.data.body.strip() @@ -547,7 +521,7 @@ class Commit(base.DataObject): raise ValueError("Unknown tag: %s" % tag) # Compile regex - r = re.compile("^%s:? (.*)$" % tag, re.IGNORECASE) + r = re.compile(r"^%s:? (.*)$" % tag, re.IGNORECASE) values = [] for line in self.body.splitlines(): @@ -602,15 +576,208 @@ class Commit(base.DataObject): """ return self.get_tag("Fixes") - @property - def date(self): - return self.data.date + # Builds + + @lazy_property + def builds(self): + if self.data.build_group_id: + return self.backend.builds.groups.get_by_id(self.data.build_group_id) + + # Jobs + + def _get_jobs(self, query, *args, **kwargs): + return self.db.fetch_many(Job, query, *args, commit=self, **kwargs) + + def _get_job(self, query, *args, **kwargs): + return self.db.fetch_one(Job, query, *args, commit=self, **kwargs) + + def _create_job(self, action, name): + """ + Creates a new job + """ + job = self._get_job(""" + INSERT INTO + source_commit_jobs + ( + commit_id, + action, + name + ) + VALUES + ( + %s, %s, %s + ) + RETURNING * + """, self.id, action, name, + ) + + # Append to cache + self.jobs.add(job) + + return job + + @lazy_property + def jobs(self): + jobs = self._get_jobs(""" + SELECT + * + FROM + source_commit_jobs + WHERE + commit_id = %s + """, self.id, + ) + + return set(jobs) + + async def process(self): + for job in self.jobs: + await job.run() + + +class Job(base.DataObject): + table = "source_commit_jobs" + + # Source + + @lazy_property + def source(self): + return self.commit.source + + # Commit @lazy_property - def packages(self): - return self.backend.packages._get_packages("SELECT * FROM packages \ - WHERE commit_id = %s", self.id) + def commit(self): + return self.backend.sources.get_commit_by_id(self.data.commit_id) + + # Action + + @property + def action(self): + return self.data.action + + # Name + + @property + def name(self): + return self.data.name + + # Run + + async def run(self): + """ + Runs the job + """ + build = None + + log.debug("Running %s..." % self) + with self.db.transaction(): + if self.action == "dist": + await self._run_dist() + + elif self.action == "deprecate": + await self._run_deprecate() + + else: + raise RuntimeError("Unhandled action: %s" % self.action) + + # Launch all jobs (in the background) + if build: + self.backend.run_task(self.backend.builds.launch, [build]) + + async def _run_deprecate(self): + """ + Called to deprecate any builds that match any of the name + """ + builds = self.backend.builds._get_builds(""" + SELECT + * + FROM + repositories + LEFT JOIN + repository_builds ON repositories.id = repository_builds.repo_id + LEFT JOIN + builds ON repository_builds.build_id = builds.id + LEFT JOIN + packages ON builds.pkg_id = packages.id + WHERE + repositories.deleted_at IS NULL + AND + repositories.distro_id = %s + AND + repositories.owner_id IS NULL + AND + repository_builds.removed_at IS NULL + AND + builds.deleted_at IS NULL + AND + builds.deprecated_by IS NULL + AND + packages.deleted_at IS NULL + AND + packages.name = %s + """, self.source.distro, self.name, + ) + + # Deprecate all matching builds + for build in builds: + build.deprecate() + + async def _run_dist(self): + """ + Called to run dist on the given package + """ + upload = None + + # Set as processed + self._set_attribute_now("processed_at") + + try: + # Create a new temporary directory and check out the requested revision + with tempfile.TemporaryDirectory() as path: + await self.source.git.checkout(self.commit.revision, path) + + # Create a path for the source packages + target = os.path.join(path, ".target") + + # Find the absolute path of the makefile + makefile = os.path.join(path, "%s/%s.nm" % (self.name, self.name)) + + # Launch a Pakfire instance with the repository configuration + with self.source.repo.pakfire() as p: + log.debug("Running dist for %s..." % makefile) + + # Run dist() + file = p.dist(makefile, target) + + # Upload the file + upload = await self.backend.uploads.create_from_local(file) + + # Create the package + # XXX reference the commit here? + package = await self.backend.packages.create(upload) + + # Create a new build (without updating the repository immediately) + build = await self.backend.builds.create( + self.source.repo, package, group=self.commit.builds, update=False) + + # XXX add watchers + + # Return the build + return build + + # Catch any exceptions and log them + except Exception as e: + log.error("Error running %s: " % self, exc_info=True) + + # Store the error + self._set_attribute("error", "%s" % e) + + # Always delete the upload + finally: + if upload: + await upload.delete() class Git(object): @@ -626,6 +793,18 @@ class Git(object): def __repr__(self): return "<%s %s>" % (self.__class__.__name__, self.path) + # Locks so that we will access a repository once at a time + __locks = {} + + @property + def lock(self): + try: + lock = self.__locks[self.path] + except KeyError: + lock = self.__locks[self.path] = asyncio.Lock() + + return lock + async def command(self, *args, **kwargs): """ Executes a Git command @@ -636,7 +815,8 @@ class Git(object): if self.is_cloned(): cwd = self.path - return await self.backend.command("git", *args, cwd=cwd, **kwargs) + async with self.lock: + return await self.backend.command("git", *args, cwd=cwd, **kwargs) def is_cloned(self): """ @@ -678,7 +858,7 @@ class Git(object): """ Fetches any changes """ - await self.command("fetch", self.url, self.branch) + await self.command("fetch", self.url, "%s:%s" % (self.branch, self.branch)) async def show_attribute(self, revision, format): return await self.command( @@ -695,6 +875,20 @@ class Git(object): """ return await self.command("show-ref", "--hash", branch, return_output=True) + async def revisions(self, start, end): + """ + Returns a list with all revisions between start and end + """ + if start is None: + return await self.show_ref(end) + + # Fetch the hashes of all revisions + revisions = await self.command("log", "--format=%H", + "%s..%s" % (start, end), return_output=True) + + # Return them in reverse order from oldest to newest + return reversed(revisions.splitlines()) + async def changed_files(self, revision, filter=None): """ Returns a list of files that has been changed diff --git a/src/database.sql b/src/database.sql index 05942177..dd51f9b2 100644 --- a/src/database.sql +++ b/src/database.sql @@ -23,20 +23,6 @@ SET row_security = off; -- *not* creating schema, since initdb creates it --- --- Name: on_update_current_timestamp_sources(); Type: FUNCTION; Schema: public; Owner: - --- - -CREATE FUNCTION public.on_update_current_timestamp_sources() RETURNS trigger - LANGUAGE plpgsql - AS $$ -BEGIN - NEW.updated = now(); - RETURN NEW; -END; -$$; - - SET default_tablespace = ''; SET default_table_access_method = heap; @@ -940,6 +926,41 @@ CREATE TABLE public.settings ( ); +-- +-- Name: source_commit_jobs; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.source_commit_jobs ( + id integer NOT NULL, + commit_id integer NOT NULL, + action text NOT NULL, + name text NOT NULL, + processed_at timestamp without time zone, + success boolean, + error text +); + + +-- +-- Name: source_commit_jobs_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.source_commit_jobs_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: source_commit_jobs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.source_commit_jobs_id_seq OWNED BY public.source_commit_jobs.id; + + -- -- Name: source_commits; Type: TABLE; Schema: public; Owner: - -- @@ -954,10 +975,31 @@ CREATE TABLE public.source_commits ( body text NOT NULL, date timestamp without time zone NOT NULL, state text DEFAULT 'pending'::text NOT NULL, - imported_at timestamp without time zone DEFAULT now() NOT NULL + created_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL, + deleted_at timestamp without time zone, + build_group_id integer ); +-- +-- Name: source_commits_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.source_commits_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: source_commits_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.source_commits_id_seq OWNED BY public.source_commits.id; + + -- -- Name: sources; Type: TABLE; Schema: public; Owner: - -- @@ -970,7 +1012,7 @@ CREATE TABLE public.sources ( gitweb text, revision text NOT NULL, branch text NOT NULL, - last_updated_at timestamp without time zone, + last_check_at timestamp without time zone, repo_id integer NOT NULL, created_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL, created_by integer NOT NULL, @@ -979,25 +1021,6 @@ CREATE TABLE public.sources ( ); --- --- Name: sources_commits_id_seq; Type: SEQUENCE; Schema: public; Owner: - --- - -CREATE SEQUENCE public.sources_commits_id_seq - START WITH 1 - INCREMENT BY 1 - NO MINVALUE - NO MAXVALUE - CACHE 1; - - --- --- Name: sources_commits_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - --- - -ALTER SEQUENCE public.sources_commits_id_seq OWNED BY public.source_commits.id; - - -- -- Name: sources_id_seq; Type: SEQUENCE; Schema: public; Owner: - -- @@ -1274,11 +1297,18 @@ ALTER TABLE ONLY public.repository_builds ALTER COLUMN id SET DEFAULT nextval('p ALTER TABLE ONLY public.sessions ALTER COLUMN id SET DEFAULT nextval('public.sessions_id_seq'::regclass); +-- +-- Name: source_commit_jobs id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.source_commit_jobs ALTER COLUMN id SET DEFAULT nextval('public.source_commit_jobs_id_seq'::regclass); + + -- -- Name: source_commits id; Type: DEFAULT; Schema: public; Owner: - -- -ALTER TABLE ONLY public.source_commits ALTER COLUMN id SET DEFAULT nextval('public.sources_commits_id_seq'::regclass); +ALTER TABLE ONLY public.source_commits ALTER COLUMN id SET DEFAULT nextval('public.source_commits_id_seq'::regclass); -- @@ -1461,6 +1491,14 @@ ALTER TABLE ONLY public.sessions ADD CONSTRAINT sessions_session_id_key UNIQUE (session_id); +-- +-- Name: source_commit_jobs source_commit_jobs_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.source_commit_jobs + ADD CONSTRAINT source_commit_jobs_pkey PRIMARY KEY (id); + + -- -- Name: source_commits source_commits_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -1794,6 +1832,13 @@ CREATE INDEX repository_builds_repo_id ON public.repository_builds USING btree ( CREATE UNIQUE INDEX repository_builds_unique ON public.repository_builds USING btree (repo_id, build_id) WHERE (removed_at IS NULL); +-- +-- Name: source_commit_jobs_commit_id; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX source_commit_jobs_commit_id ON public.source_commit_jobs USING btree (commit_id); + + -- -- Name: source_commits_unique; Type: INDEX; Schema: public; Owner: - -- @@ -1829,13 +1874,6 @@ CREATE UNIQUE INDEX uploads_uuid ON public.uploads USING btree (uuid); CREATE INDEX user_push_subscriptions_user_id ON public.user_push_subscriptions USING btree (user_id) WHERE (deleted_at IS NULL); --- --- Name: sources on_update_current_timestamp; Type: TRIGGER; Schema: public; Owner: - --- - -CREATE TRIGGER on_update_current_timestamp BEFORE UPDATE ON public.sources FOR EACH ROW EXECUTE FUNCTION public.on_update_current_timestamp_sources(); - - -- -- Name: build_groups build_groups_created_by; Type: FK CONSTRAINT; Schema: public; Owner: - -- @@ -2252,6 +2290,22 @@ ALTER TABLE ONLY public.sessions ADD CONSTRAINT sessions_user_id FOREIGN KEY (user_id) REFERENCES public.users(id); +-- +-- Name: source_commit_jobs source_commit_jobs_commit_id; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.source_commit_jobs + ADD CONSTRAINT source_commit_jobs_commit_id FOREIGN KEY (commit_id) REFERENCES public.source_commits(id); + + +-- +-- Name: source_commits source_commits_build_group_id; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.source_commits + ADD CONSTRAINT source_commits_build_group_id FOREIGN KEY (build_group_id) REFERENCES public.build_groups(id); + + -- -- Name: source_commits sources_commits_source_id; Type: FK CONSTRAINT; Schema: public; Owner: - -- diff --git a/src/scripts/pakfire-build-service b/src/scripts/pakfire-build-service index 6d5f5fee..ee331788 100644 --- a/src/scripts/pakfire-build-service +++ b/src/scripts/pakfire-build-service @@ -44,11 +44,15 @@ class Cli(object): "releasemonitoring:check" : self._release_monitoring_check, # Repositories - "repos:fetch-sources" : self.backend.repos.fetch_sources, + "repos:check-sources" : self.backend.repos.check_sources, "repos:relaunch-pending-jobs" : self._repos_relaunch_pending_jobs, "repos:rotate-keys" : self.backend.repos.rotate_keys, "repos:write" : self.backend.repos.write, + # Sources + "sources:check" : self.backend.sources.check, + "sources:process" : self.backend.sources.process, + # Sync "sync" : self.backend.sync, diff --git a/src/templates/sources/show.html b/src/templates/sources/show.html index 47ba7669..31b9b564 100644 --- a/src/templates/sources/show.html +++ b/src/templates/sources/show.html @@ -45,6 +45,17 @@

{{ source }}

+ +
+
+
+

{{ _("Last Check") }}

+

+ {{ locale.format_date(source.last_check_at, shorter=True) }} +

+
+
+