From: Michael Tremer Date: Fri, 21 Dec 2012 15:40:14 +0000 (+0100) Subject: manager: Update dist and repository update jobs. X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e6fa8404be91bde693a0a4414f21d57731cfa7bb;p=pbs.git manager: Update dist and repository update jobs. --- diff --git a/manager/__init__.py b/manager/__init__.py index d3713a4d..fb888606 100644 --- a/manager/__init__.py +++ b/manager/__init__.py @@ -4,7 +4,7 @@ import base from bugs import BugsUpdateEvent from builds import BuildsFailedRestartEvent, CheckBuildDependenciesEvent -from builds import CreateTestBuildsEvent +from builds import CreateTestBuildsEvent, DistEvent from messages import MessagesSendEvent from repositories import RepositoriesUpdateEvent from sessions import SessionsCleanupEvent diff --git a/manager/builds.py b/manager/builds.py index ab1c5f68..395e265e 100644 --- a/manager/builds.py +++ b/manager/builds.py @@ -2,9 +2,18 @@ import datetime import logging +import pakfire +import pakfire.config +import shutil +import tempfile + +import backend.builds +import backend.git import base +from pakfire.constants import * + class BuildsFailedRestartEvent(base.Event): # Run when idle. priority = 5 @@ -64,8 +73,12 @@ class CheckBuildDependencyEvent(base.Event): priority = 3 def run(self, job_id): + self.run_subprocess(self._run, job_id) + + @staticmethod + def _run(_pakfire, job_id): # Get the build job we are working on. - job = self.pakfire.jobs.get_by_id(job_id) + job = _pakfire.jobs.get_by_id(job_id) if not job: logging.debug("Job %s does not exist." % job_id) return @@ -130,149 +143,77 @@ class CreateTestBuildsEvent(base.Event): break -class DistManager(object): - process = None +class DistEvent(base.Event): + interval = 60 first_run = True - def get_next_commit(self): - commits = self.pakfire.sources.get_pending_commits(limit=1) - - if not commits: - return - - return commits[0] - - @property - def timeout(self): - # If there are commits standing in line, we try to restart as soon - # as possible. - if self.get_next_commit(): - return 0 - - # Otherwise we wait at least for a minute. - return 60 - - def do(self): + def run(self): if self.first_run: self.first_run = False self.process = self.init_repos() - if self.process: - # If the process is still running, we check back in a couple of - # seconds. - if self.process.is_alive(): - return 1 - - # The process has finished its work. Clear everything up and - # go on. - self.commit = self.process = None - - # Search for a new commit to proceed with. - self.commit = commit = self.get_next_commit() - - # If no commit is there, we just wait for a minute. - if not commit: - return 60 - - # Got a commit to process. - commit.state = "running" - - logging.debug("Processing commit %s: %s" % (commit.revision, commit.subject)) - - # Get the repository of this commit. - repo = backend.git.Repo(self.pakfire, commit.source_id) - - # Make sure, it is checked out. - if not repo.cloned: - repo.clone() + for commit in self.pakfire.sources.get_pending_commits(): + commit.state = "running" - # Navigate to the right revision. - repo.checkout(commit.revision) + logging.debug("Processing commit %s: %s" % (commit.revision, commit.subject)) - # Get all changed makefiles. - deleted_files = [] - updated_files = [] + # Get the repository of this commit. + repo = backend.git.Repo(self.pakfire, commit.source_id) - for file in repo.changed_files(commit.revision): - # Don't care about files that are not a makefile. - if not file.endswith(".%s" % MAKEFILE_EXTENSION): - continue - - if os.path.exists(file): - updated_files.append(file) - else: - deleted_files.append(file) - - self.process = self.fork(commit_id=commit.id, updated_files=updated_files, - deleted_files=deleted_files) + # Make sure, it is checked out. + if not repo.cloned: + repo.clone() - return 1 + # Navigate to the right revision. + repo.checkout(commit.revision) - def fork(self, source_id=None, commit_id=None, updated_files=[], deleted_files=[]): - # Create the Process object. - process = multiprocessing.Process( - target=self._process, - args=(source_id, commit_id, updated_files, deleted_files) - ) + # Get all changed makefiles. + deleted_files = [] + updated_files = [] - # The process is running in daemon mode so it will try to kill - # all child processes when exiting. - process.daemon = True + for file in repo.changed_files(commit.revision): + # Don't care about files that are not a makefile. + if not file.endswith(".%s" % MAKEFILE_EXTENSION): + continue - # Start the process. - process.start() - logging.debug("Started new process pid=%s." % process.pid) + if os.path.exists(file): + updated_files.append(file) + else: + deleted_files.append(file) - return process + e = DistFileEvent(self.pakfire, None, commit.id, updated_files, deleted_files) + self.scheduler.add_event(e) def init_repos(self): - # Create the Process object. - process = multiprocessing.Process( - target=self._init_repos, - ) - - # The process is running in daemon mode so it will try to kill - # all child processes when exiting. - #process.daemon = True - - # Start the process. - process.start() - logging.debug("Started new process pid=%s." % process.pid) - - return process - - def _init_repos(self): - _pakfire = main.Pakfire() - sources = _pakfire.sources.get_all() - - for source in sources: + """ + Initialize all repositories. + """ + for source in self.pakfire.sources.get_all(): + # Skip those which already have a revision. if source.revision: continue - repo = GitRepo(_pakfire, source.id) + # Initialize the repository or and clone it if necessary. + repo = backend.git.Repo(self.pakfire, source.id) if not repo.cloned: repo.clone() + # Get a list of all files in the repository. files = repo.get_all_files() - for file in files: - if not file.endswith(".%s" % MAKEFILE_EXTENSION): - continue + for file in [f for f in files if file.endswith(".%s" % MAKEFILE_EXTENSION)]: + e = DistFileEvent(self.pakfire, source.id, None, [file,], []) + self.scheduler.add_event(e) - #files = [f for f in files if f.endswith(".%s" % MAKEFILE_EXTENSION)] - process = self.fork(source_id=source.id, updated_files=[file,], deleted_files=[]) - - while process.is_alive(): - time.sleep(1) - continue +class DistFileEvent(base.Event): + def run(self, *args): + self.run_subprocess(self._run, *args) @staticmethod - def _process(source_id, commit_id, updated_files, deleted_files): - _pakfire = main.Pakfire() - + def _run(_pakfire, source_id, commit_id, updated_files, deleted_files): commit = None source = None @@ -296,7 +237,7 @@ class DistManager(object): config = pakfire.config.Config(["general.conf",]) config.parse(source.distro.get_config()) - p = pakfire.Pakfire(mode="server", config=config) + p = pakfire.PakfireServer(config=config) pkgs = [] for file in updated_files: @@ -309,7 +250,7 @@ class DistManager(object): # Import all packages in one swoop. for pkg in pkgs: # Import the package file and create a build out of it. - builds.import_from_package(_pakfire, pkg, + backend.builds.import_from_package(_pakfire, pkg, distro=source.distro, commit=commit, type="release") except: @@ -322,13 +263,12 @@ class DistManager(object): if os.path.exists(pkg_dir): shutil.rmtree(pkg_dir) - for file in deleted_files: - # Determine the name of the package. - name = os.path.basename(file) - name = name[:len(MAKEFILE_EXTENSION) + 1] + for file in deleted_files: + # Determine the name of the package. + name = os.path.basename(file) + name = name[:len(MAKEFILE_EXTENSION) + 1] - if commit: - commit.distro.delete_package(name) + source.distro.delete_package(name) - if commit: - commit.state = "finished" + if commit: + commit.state = "finished" diff --git a/manager/repositories.py b/manager/repositories.py index b6c5c9a5..7f75941d 100644 --- a/manager/repositories.py +++ b/manager/repositories.py @@ -12,8 +12,8 @@ class RepositoriesUpdateEvent(base.Event): priority = 6 @property - def timeout(self): - return self.settings.get_int("repository_update_interval", 600) + def interval(self): + return self.pakfire.settings.get_int("repository_update_interval", 600) def run(self): for distro in self.pakfire.distros.get_all(): @@ -32,7 +32,12 @@ class RepositoryUpdateEvent(base.Event): priority = 5 def run(self, repo_id): - repo = self.pakfire.repos.get_by_id(repo_id) + # Run this in a new process. + self.run_subprocess_background(self.update_repo, repo_id) + + @staticmethod + def update_repo(_pakfire, repo_id): + repo = _pakfire.repos.get_by_id(repo_id) assert repo logging.info("Going to update repository %s..." % repo.name) @@ -44,7 +49,7 @@ class RepositoryUpdateEvent(base.Event): arches = repo.arches # Add the source repository. - arches.append(self.pakfire.arches.get_by_name("src")) + arches.append(_pakfire.arches.get_by_name("src")) for arch in arches: changed = False @@ -111,7 +116,7 @@ class RepositoryUpdateEvent(base.Event): p.repo_create(repo_path, source_files, name="%s - %s.%s" % (repo.distro.name, repo.name, arch.name), - key_id=key_id, type=arch.build_type) + key_id=key_id) # Remove files afterwards. for file in remove_files: @@ -121,4 +126,3 @@ class RepositoryUpdateEvent(base.Event): os.remove(file) except OSError: logging.warning("Could not remove %s." % file) - diff --git a/pakfire-manager b/pakfire-manager index b84d1463..01dcf42c 100644 --- a/pakfire-manager +++ b/pakfire-manager @@ -23,6 +23,7 @@ events = ( manager.CheckBuildDependenciesEvent, manager.CleanupFilesEvent, manager.CreateTestBuildsEvent, + manager.DistEvent, manager.MessagesSendEvent, manager.RepositoriesUpdateEvent, manager.SessionsCleanupEvent,