From: Michael Tremer Date: Thu, 13 Dec 2012 23:18:54 +0000 (+0100) Subject: Initial import of the new manager daemon. X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=83be31065c3a16e8ff2ef5c314f70cf687d444a5;p=pbs.git Initial import of the new manager daemon. --- diff --git a/backend/builds.py b/backend/builds.py index ab5ce031..9d36fc0e 100644 --- a/backend/builds.py +++ b/backend/builds.py @@ -193,7 +193,8 @@ class Builds(base.Object): jobs.state = 'finished' AND \ jobs.time_finished < %s \ ) \ - AND builds.type = 'release' AND NOT builds.state = 'broken'" + AND builds.type = 'release' \ + AND (builds.state = 'stable' OR builds.state = 'testing')" args = [arch.id, threshold, arch.id, threshold] if randomize: @@ -1302,8 +1303,11 @@ class Jobs(base.Object): # Return sorted list of jobs. return sorted(jobs) - def get_active(self, host_id=None, uploads=True): - running_states = ["dispatching", "new", "pending", "running"] + def get_active(self, host_id=None, uploads=True, running_only=False): + running_states = ["dispatching", "running"] + + if not running_only: + running_states += ["new", "pending",] if uploads: running_states.append("uploading") @@ -2152,7 +2156,7 @@ class Job(base.Object): # Create a new pakfire instance with the configuration for # this build. - p = pakfire.Pakfire(mode="server", config=config, arch=self.arch.name) + p = pakfire.PakfireServer(config=config, arch=self.arch.name) # Try to solve the build dependencies. try: diff --git a/backend/git.py b/backend/git.py new file mode 100644 index 00000000..02d1de73 --- /dev/null +++ b/backend/git.py @@ -0,0 +1,147 @@ +#!/usr/bin/python + +import datetime +import logging +import os +import subprocess + +import base +import sources + +class Repo(base.Object): + def __init__(self, pakfire, id, mode="normal"): + base.Object.__init__(self, pakfire) + + assert mode in ("normal", "bare", "mirror") + + # Get the source object. + self.source = sources.Source(pakfire, id) + self.mode = mode + + @property + def path(self): + return os.path.join("/var/cache/pakfire/git-repos", self.source.identifier, self.mode) + + def git(self, cmd, path=None): + if not path: + path = self.path + + cmd = "cd %s && git %s" % (path, cmd) + + logging.debug("Running command: %s" % cmd) + + return subprocess.check_output(["/bin/sh", "-c", cmd]) + + @property + def cloned(self): + """ + Say if the repository is already cloned. + """ + return os.path.exists(self.path) + + def clone(self): + if self.cloned: + return + + path = os.path.dirname(self.path) + repo = os.path.basename(self.path) + + # Create the repository home directory if not exists. + if not os.path.exists(path): + os.makedirs(path) + + command = ["clone"] + if self.mode == "bare": + command.append("--bare") + elif self.mode == "mirror": + command.append("--mirror") + + command.append(self.source.url) + command.append(repo) + + # Clone the repository. + try: + self.git(" ".join(command), path=path) + except Exception: + shutil.rmtree(self.path) + raise + + def fetch(self): + # Make sure, the repository was already cloned. + if not self.cloned: + self.clone() + + self.git("fetch") + + def rev_list(self, revision=None): + if not revision: + if self.source.head_revision: + revision = self.source.head_revision.revision + else: + revision = self.source.start_revision + + command = "rev-list %s..%s" % (revision, self.source.branch) + + # Get all merge commits. + merges = self.git("%s --merges" % command).splitlines() + + revisions = [] + for commit in self.git(command).splitlines(): + # Check if commit is a normal commit or merge commit. + merge = commit in merges + + revisions.append((commit, merge)) + + return [r for r in reversed(revisions)] + + def import_revisions(self): + # Get all pending revisions. + revisions = self.rev_list() + + for revision, merge in revisions: + # Actually import the revision. + self._import_revision(revision, merge) + + def _import_revision(self, revision, merge): + logging.debug("Going to import revision %s (merge: %s)." % (revision, merge)) + + rev_author = self.git("log -1 --format=\"%%an <%%ae>\" %s" % revision) + rev_committer = self.git("log -1 --format=\"%%cn <%%ce>\" %s" % revision) + rev_subject = self.git("log -1 --format=\"%%s\" %s" % revision) + rev_body = self.git("log -1 --format=\"%%b\" %s" % revision) + rev_date = self.git("log -1 --format=\"%%at\" %s" % revision) + rev_date = datetime.datetime.utcfromtimestamp(float(rev_date)) + + # Convert strings properly. No idea why I have to do that. + #rev_author = rev_author.decode("latin-1").strip() + #rev_committer = rev_committer.decode("latin-1").strip() + #rev_subject = rev_subject.decode("latin-1").strip() + #rev_body = rev_body.decode("latin-1").rstrip() + + # Create a new commit object in the database + commit = sources.Commit.create(self.pakfire, self.source, revision, + rev_author, rev_committer, rev_subject, rev_body, rev_date) + + def checkout(self, revision, update=False): + for update in (0, 1): + if update: + self.fetch() + + try: + self.git("checkout %s" % revision) + + except subprocess.CalledProcessError: + if not update: + continue + + raise + + def changed_files(self, revision): + files = self.git("diff --name-only %s^ %s" % (revision, revision)) + + return [os.path.join(self.path, f) for f in files.splitlines()] + + def get_all_files(self): + files = self.git("ls-files") + + return [os.path.join(self.path, f) for f in files.splitlines()] diff --git a/backend/main.py b/backend/main.py index be24867d..e27e5cca 100644 --- a/backend/main.py +++ b/backend/main.py @@ -28,7 +28,7 @@ import users from constants import * class Pakfire(object): - def __init__(self, config_file): + def __init__(self, config_file="pbs.conf"): # Read configuration file. self.config = self.read_config(config_file) diff --git a/backend/managers.py b/backend/managers.py deleted file mode 100644 index eeeb6829..00000000 --- a/backend/managers.py +++ /dev/null @@ -1,940 +0,0 @@ -#!/usr/bin/python - -import datetime -import logging -import multiprocessing -import os -import shutil -import subprocess -import tempfile -import time -import tornado.ioloop - -import pakfire -import pakfire.api -import pakfire.config -from pakfire.constants import * - -import base -import builds -import main -import packages -import sources - -from constants import * - -managers = [] - -class Manager(base.Object): - def __init__(self, pakfire): - base.Object.__init__(self, pakfire) - - self.pc = tornado.ioloop.PeriodicCallback(self, self.timeout * 1000) - - logging.info("%s was initialized." % self.__class__.__name__) - - self() - - def __call__(self): - logging.info("%s main method was called." % self.__class__.__name__) - - timeout = self.do() - - if timeout is None: - timeout = self.timeout - - # Update callback_time. - self.pc.callback_time = timeout * 1000 - logging.debug("Next call will be in ~%.2f seconds." % timeout) - - @property - def timeout(self): - """ - Return a new callback timeout in seconds. - """ - raise NotImplementedError - - def do(self): - raise NotImplementedError - - - # Helper functions - - def wait_for_processes(self): - ALIVE_CHECK_INTERVAL = 0.5 - - logging.debug("There are %s process(es) in the queue." % len(self.processes)) - - # Nothing to do, if there are no processes running. - if not self.processes: - return - - # Get the currently running? process. - process = self.processes[0] - - # If the first process is running, everything is okay and - # we'll have to wait. - if process.is_alive(): - return ALIVE_CHECK_INTERVAL - - # If the process has not been run, it is started now. - if process.exitcode is None: - logging.debug("Process %s is being started..." % process) - - process.start() - return ALIVE_CHECK_INTERVAL - - # If the process has stopped working we check why... - elif process.exitcode == 0: - logging.debug("Process %s has successfully finished." % process) - - elif process.exitcode > 0: - logging.error("Process %s has exited with code: %s" % \ - (process, process.exitcode)) - - elif process.exitcode < 0: - logging.error("Process %s has ended with signal %s" % \ - (process, process.exitcode)) - - # Remove process from process queue. - self.processes.remove(process) - - # If there are still processes in the queue, we start this function - # again... - if self.processes: - return self.wait_for_processes() - - -class MessagesManager(Manager): - @property - def messages(self): - """ - Shortcut to messages object. - """ - return self.pakfire.messages - - @property - def timeout(self): - # If we have messages, we should run as soon as possible. - if self.messages.count: - return 0 - - # Otherwise we sleep for "mesages_interval" - return self.settings.get_int("messages_interval", 10) - - def do(self): - logging.info("Sending a bunch of messages.") - - # Send up to 25 messages and return. - i = 0 - for msg in self.messages.get_all(limit=25): - try: - self.messages.send_msg(msg) - - except Exception, e: - logging.critical("There was an error sending mail: %s" % e) - raise - - else: - # If everything was okay, we can delete the message in the database. - self.messages.delete(msg.id) - i += 1 - - count = self.messages.count - - logging.debug("Successfully sent %s message(s). %s still in queue." \ - % (i, count)) - - # If there are still mails left, we start again as soon as possible. - if count: - return 0 - - return self.settings.get_int("messages_interval", 10) - - -managers.append(MessagesManager) - -class BugsUpdateManager(Manager): - @property - def timeout(self): - return self.settings.get_int("bugzilla_update_interval", 60) - - def do(self): - # Get up to ten updates. - query = self.db.query("SELECT * FROM builds_bugs_updates \ - WHERE error = 'N' ORDER BY time LIMIT 10") - - # XXX CHECK IF BZ IS ACTUALLY REACHABLE AND WORKING - - for update in query: - try: - bug = self.pakfire.bugzilla.get_bug(update.bug_id) - if not bug: - logging.error("Bug #%s does not exist." % update.bug_id) - continue - - # Set the changes. - bug.set_status(update.status, update.resolution, update.comment) - - except Exception, e: - # If there was an error, we save that and go on. - self.db.execute("UPDATE builds_bugs_updates SET error = 'Y', error_msg = %s \ - WHERE id = %s", "%s" % e, update.id) - - else: - # Remove the update when it has been done successfully. - self.db.execute("DELETE FROM builds_bugs_updates WHERE id = %s", update.id) - - -managers.append(BugsUpdateManager) - -class GitRepo(base.Object): - def __init__(self, pakfire, id, mode="normal"): - base.Object.__init__(self, pakfire) - - assert mode in ("normal", "bare", "mirror") - - # Get the source object. - self.source = sources.Source(pakfire, id) - self.mode = mode - - @property - def path(self): - return os.path.join("/var/cache/pakfire/git-repos", self.source.identifier, self.mode) - - def git(self, cmd, path=None): - if not path: - path = self.path - - cmd = "cd %s && git %s" % (path, cmd) - - logging.debug("Running command: %s" % cmd) - - return subprocess.check_output(["/bin/sh", "-c", cmd]) - - @property - def cloned(self): - """ - Say if the repository is already cloned. - """ - return os.path.exists(self.path) - - def clone(self): - if self.cloned: - return - - path = os.path.dirname(self.path) - repo = os.path.basename(self.path) - - # Create the repository home directory if not exists. - if not os.path.exists(path): - os.makedirs(path) - - command = ["clone"] - if self.mode == "bare": - command.append("--bare") - elif self.mode == "mirror": - command.append("--mirror") - - command.append(self.source.url) - command.append(repo) - - # Clone the repository. - try: - self.git(" ".join(command), path=path) - except Exception: - shutil.rmtree(self.path) - raise - - def fetch(self): - # Make sure, the repository was already cloned. - if not self.cloned: - self.clone() - - self.git("fetch") - - def rev_list(self, revision=None): - if not revision: - if self.source.head_revision: - revision = self.source.head_revision.revision - else: - revision = self.source.start_revision - - command = "rev-list %s..%s" % (revision, self.source.branch) - - # Get all merge commits. - merges = self.git("%s --merges" % command).splitlines() - - revisions = [] - for commit in self.git(command).splitlines(): - # Check if commit is a normal commit or merge commit. - merge = commit in merges - - revisions.append((commit, merge)) - - return [r for r in reversed(revisions)] - - def import_revisions(self): - # Get all pending revisions. - revisions = self.rev_list() - - for revision, merge in revisions: - # Actually import the revision. - self._import_revision(revision, merge) - - def _import_revision(self, revision, merge): - logging.debug("Going to import revision %s (merge: %s)." % (revision, merge)) - - rev_author = self.git("log -1 --format=\"%%an <%%ae>\" %s" % revision) - rev_committer = self.git("log -1 --format=\"%%cn <%%ce>\" %s" % revision) - rev_subject = self.git("log -1 --format=\"%%s\" %s" % revision) - rev_body = self.git("log -1 --format=\"%%b\" %s" % revision) - rev_date = self.git("log -1 --format=\"%%at\" %s" % revision) - rev_date = datetime.datetime.utcfromtimestamp(float(rev_date)) - - # Convert strings properly. No idea why I have to do that. - #rev_author = rev_author.decode("latin-1").strip() - #rev_committer = rev_committer.decode("latin-1").strip() - #rev_subject = rev_subject.decode("latin-1").strip() - #rev_body = rev_body.decode("latin-1").rstrip() - - # Create a new commit object in the database - commit = sources.Commit.create(self.pakfire, self.source, revision, - rev_author, rev_committer, rev_subject, rev_body, rev_date) - - def checkout(self, revision, update=False): - for update in (0, 1): - if update: - self.fetch() - - try: - self.git("checkout %s" % revision) - - except subprocess.CalledProcessError: - if not update: - continue - - raise - - def changed_files(self, revision): - files = self.git("diff --name-only %s^ %s" % (revision, revision)) - - return [os.path.join(self.path, f) for f in files.splitlines()] - - def get_all_files(self): - files = self.git("ls-files") - - return [os.path.join(self.path, f) for f in files.splitlines()] - - -class SourceManager(Manager): - @property - def sources(self): - return self.pakfire.sources - - @property - def timeout(self): - return self.settings.get_int("source_update_interval", 60) - - def do(self): - for source in self.sources.get_all(): - repo = GitRepo(self.pakfire, source.id, mode="mirror") - - # If the repository is not yet cloned, we need to make a local - # clone to work with. - if not repo.cloned: - repo.clone() - - # If we have cloned a new repository, we exit to not get over - # the time treshold. - return 0 - - # Otherwise we just fetch updates. - else: - repo.fetch() - - # Import all new revisions. - repo.import_revisions() - - -managers.append(SourceManager) - -class DistManager(Manager): - process = None - - first_run = True - - def get_next_commit(self): - commits = self.pakfire.sources.get_pending_commits(limit=1) - - if not commits: - return - - return commits[0] - - @property - def timeout(self): - # If there are commits standing in line, we try to restart as soon - # as possible. - if self.get_next_commit(): - return 0 - - # Otherwise we wait at least for a minute. - return 60 - - def do(self): - if self.first_run: - self.first_run = False - - self.process = self.init_repos() - - if self.process: - # If the process is still running, we check back in a couple of - # seconds. - if self.process.is_alive(): - return 1 - - # The process has finished its work. Clear everything up and - # go on. - self.commit = self.process = None - - # Search for a new commit to proceed with. - self.commit = commit = self.get_next_commit() - - # If no commit is there, we just wait for a minute. - if not commit: - return 60 - - # Got a commit to process. - commit.state = "running" - - logging.debug("Processing commit %s: %s" % (commit.revision, commit.subject)) - - # Get the repository of this commit. - repo = GitRepo(self.pakfire, commit.source_id) - - # Make sure, it is checked out. - if not repo.cloned: - repo.clone() - - # Navigate to the right revision. - repo.checkout(commit.revision) - - # Get all changed makefiles. - deleted_files = [] - updated_files = [] - - for file in repo.changed_files(commit.revision): - # Don't care about files that are not a makefile. - if not file.endswith(".%s" % MAKEFILE_EXTENSION): - continue - - if os.path.exists(file): - updated_files.append(file) - else: - deleted_files.append(file) - - self.process = self.fork(commit_id=commit.id, updated_files=updated_files, - deleted_files=deleted_files) - - return 1 - - def fork(self, source_id=None, commit_id=None, updated_files=[], deleted_files=[]): - # Create the Process object. - process = multiprocessing.Process( - target=self._process, - args=(source_id, commit_id, updated_files, deleted_files) - ) - - # The process is running in daemon mode so it will try to kill - # all child processes when exiting. - process.daemon = True - - # Start the process. - process.start() - logging.debug("Started new process pid=%s." % process.pid) - - return process - - def init_repos(self): - # Create the Process object. - process = multiprocessing.Process( - target=self._init_repos, - ) - - # The process is running in daemon mode so it will try to kill - # all child processes when exiting. - #process.daemon = True - - # Start the process. - process.start() - logging.debug("Started new process pid=%s." % process.pid) - - return process - - def _init_repos(self): - _pakfire = main.Pakfire() - sources = _pakfire.sources.get_all() - - for source in sources: - if source.revision: - continue - - repo = GitRepo(_pakfire, source.id) - if not repo.cloned: - repo.clone() - - files = repo.get_all_files() - - for file in files: - if not file.endswith(".%s" % MAKEFILE_EXTENSION): - continue - - #files = [f for f in files if f.endswith(".%s" % MAKEFILE_EXTENSION)] - - process = self.fork(source_id=source.id, updated_files=[file,], deleted_files=[]) - - while process.is_alive(): - time.sleep(1) - continue - - @staticmethod - def _process(source_id, commit_id, updated_files, deleted_files): - _pakfire = main.Pakfire() - - commit = None - source = None - - if commit_id: - commit = _pakfire.sources.get_commit_by_id(commit_id) - assert commit - - source = commit.source - - if source_id and not source: - source = _pakfire.sources.get_by_id(source_id) - - assert source - - if updated_files: - # Create a temporary directory where to put all the files - # that are generated here. - pkg_dir = tempfile.mkdtemp() - - try: - config = pakfire.config.Config(["general.conf",]) - config.parse(source.distro.get_config()) - - p = pakfire.Pakfire(mode="server", config=config) - - pkgs = [] - for file in updated_files: - try: - pkg_file = p.dist(file, pkg_dir) - pkgs.append(pkg_file) - except: - raise - - # Import all packages in one swoop. - for pkg in pkgs: - # Import the package file and create a build out of it. - builds.import_from_package(_pakfire, pkg, - distro=source.distro, commit=commit, type="release") - - except: - if commit: - commit.state = "failed" - - raise - - finally: - if os.path.exists(pkg_dir): - shutil.rmtree(pkg_dir) - - for file in deleted_files: - # Determine the name of the package. - name = os.path.basename(file) - name = name[:len(MAKEFILE_EXTENSION) + 1] - - if commit: - commit.distro.delete_package(name) - - if commit: - commit.state = "finished" - - -managers.append(DistManager) - -class BuildsManager(Manager): - @property - def timeout(self): - return self.settings.get_int("build_keepalive_interval", 900) - - def do(self): - threshold = datetime.datetime.utcnow() - datetime.timedelta(hours=72) - - for job in self.pakfire.jobs.get_next_iter(type="build", max_tries=9, states=["failed"]): - if job.build.state == "broken": - continue - - if not job.time_finished or job.time_finished > threshold: - continue - - # Restart the job. - logging.info("Restarting build job: %s" % job) - job.set_state("new", log=False) - - -managers.append(BuildsManager) - -class UploadsManager(Manager): - @property - def timeout(self): - return self.settings.get_int("uploads_remove_interval", 3600) - - def do(self): - self.pakfire.uploads.cleanup() - - -managers.append(UploadsManager) - -class RepositoryManager(Manager): - processes = [] - - @property - def timeout(self): - return self.settings.get_int("repository_update_interval", 600) - - def do(self): - for process in self.processes[:]: - # If the first process is running, everything is okay and - # we'll have to wait. - if process.is_alive(): - return 0.5 - - # If the process has not been run, it is started now. - if process.exitcode is None: - logging.debug("Process %s is being started..." % process) - - process.start() - return 1 - - # If the process has stopped working we check why... - else: - if process.exitcode == 0: - logging.debug("Process %s has successfully finished." % process) - - elif process.exitcode > 0: - logging.error("Process %s has exited with code: %s" % \ - (process, process.exitcode)) - - elif process.exitcode < 0: - logging.error("Process %s has ended with signal %s" % \ - (process, process.exitcode)) - - # Remove process from process queue. - self.processes.remove(process) - - # Start the loop again if there any processes left - # that need to be started. - if self.processes: - continue - - # Otherwise wait some time and start from the beginning. - else: - return self.settings.get_int("repository_update_interval", 600) - - for distro in self.pakfire.distros.get_all(): - for repo in distro.repositories: - # Skip repostories that do not need an update at all. - if not repo.needs_update(): - logging.info("Repository %s - %s is already up to date." % (distro.name, repo.name)) - continue - - # Create the Process object. - process = multiprocessing.Process( - target=self._process, - args=(repo.id,) - ) - - # Run the process in daemon mode. - process.daemon = True - - # Add the process to the process queue. - self.processes.append(process) - - # XXX DEVEL - #if self.processes: - # return 0 - #else: - # return - - # Create dependency updater after all repositories have been - # updated. - #jobs = self.pakfire.jobs.get_next_iter(states=["new", "dependency_error", "failed",]) - - #for job in jobs: - # process = multiprocessing.Process( - # target=self._dependency_update_process, - # args=(job.id,) - # ) - # process.daemon = True - # self.processes.append(process) - - # Start again as soon as possible. - #if self.processes: - # return 0 - - @staticmethod - def _process(repo_id): - _pakfire = main.Pakfire() - - repo = _pakfire.repos.get_by_id(repo_id) - assert repo - - logging.info("Going to update repository %s..." % repo.name) - - # Update the timestamp when we started at last. - repo.updated() - - # Find out for which arches this repository is created. - arches = repo.arches - - # Add the source repository. - arches.append(_pakfire.arches.get_by_name("src")) - - for arch in arches: - changed = False - - # Get all packages that are to be included in this repository. - pkgs = repo.get_packages(arch) - - # Log all packages. - for pkg in pkgs: - logging.info(" %s" % pkg) - - repo_path = os.path.join( - REPOS_DIR, - repo.distro.identifier, - repo.identifier, - arch.name - ) - - if not os.path.exists(repo_path): - os.makedirs(repo_path) - - source_files = [] - remove_files = [] - - for filename in os.listdir(repo_path): - path = os.path.join(repo_path, filename) - - if not os.path.isfile(path): - continue - - remove_files.append(path) - - for pkg in pkgs: - filename = os.path.basename(pkg.path) - - source_file = os.path.join(PACKAGES_DIR, pkg.path) - target_file = os.path.join(repo_path, filename) - - # Do not add duplicate files twice. - if source_file in source_files: - #logging.warning("Duplicate file detected: %s" % source_file) - continue - - source_files.append(source_file) - - try: - remove_files.remove(target_file) - except ValueError: - changed = True - - if remove_files: - changed = True - - # If nothing in the repository data has changed, there - # is nothing to do. - if changed: - logging.info("The repository has updates...") - else: - logging.info("Nothing to update.") - continue - - # Find the key to sign the package. - key_id = None - if repo.key: - key_id = repo.key.fingerprint - - # Create package index. - pakfire.api.repo_create(repo_path, source_files, - name="%s - %s.%s" % (repo.distro.name, repo.name, arch.name), - key_id=key_id, type=arch.build_type, mode="server") - - # Remove files afterwards. - for file in remove_files: - file = os.path.join(repo_path, file) - - try: - os.remove(file) - except OSError: - logging.warning("Could not remove %s." % file) - - @staticmethod - def _dependency_update_process(job_id): - _pakfire = main.Pakfire() - - job = _pakfire.jobs.get_by_id(job_id) - assert job - - job.resolvdep() - - -managers.append(RepositoryManager) - -class TestManager(Manager): - @property - def timeout(self): - return 300 - - @property - def test_threshold(self): - threshold_days = self.pakfire.settings.get_int("test_threshold_days", 14) - - return datetime.datetime.utcnow() - datetime.timedelta(days=threshold_days) - - def do(self): - max_queue_length = self.pakfire.settings.get_int("test_queue_limit", 10) - - # Get a list with all feasible architectures. - arches = self.pakfire.arches.get_all() - noarch = self.pakfire.arches.get_by_name("noarch") - if noarch: - arches.append(noarch) - - for arch in arches: - # Get the job queue for each architecture. - queue = self.pakfire.jobs.get_next(arches=[arch,]) - - # Skip adding new jobs if there are more too many jobs in the queue. - limit = max_queue_length - len(queue) - if limit <= 0: - logging.debug("Already too many jobs in queue of %s to create tests." % arch.name) - continue - - # Get a list of builds, with potentially need a test build. - # Randomize the output and do not return more jobs than we are - # allowed to put into the build queue. - builds = self.pakfire.builds.needs_test(self.test_threshold, - arch=arch, limit=limit, randomize=True) - - if not builds: - logging.debug("No builds needs a test for %s." % arch.name) - continue - - # Search for the job with the right architecture in each - # build and schedule a test job. - for build in builds: - for job in build.jobs: - if not job.arch == arch: - continue - - job.schedule("test") - break - - -managers.append(TestManager) - - -class DependencyChecker(Manager): - processes = [] - - @property - def timeout(self): - return self.settings.get_int("dependency_checker_interval", 30) - - def do(self): - if self.processes: - return self.wait_for_processes() - - return self.search_jobs() - - def search_jobs(self): - # Find the jobs who need the update the most. - job_ids = [] - - # Get all jobs in new state, no matter how many these are. - query = self.db.query("SELECT id FROM jobs WHERE state = 'new'") - for job in query: - job_ids.append(job.id) - - # If there are no jobs to check, search deeper. - if not job_ids: - query = self.db.query("SELECT id FROM jobs \ - WHERE state = 'dependency_error' AND time_finished < DATE_SUB(NOW(), INTERVAL 5 MINUTE) \ - ORDER BY time_finished LIMIT 50") - - for job in query: - job_ids.append(job.id) - - # Create a subprocess for every single job we have to work on. - for job_id in job_ids: - process = multiprocessing.Process( - target=self._process, args=(job_id,) - ) - process.daemon = True - self.processes.append(process) - - # Start immediately again if we have running subprocesses. - if self.processes: - return 0 - - @staticmethod - def _process(job_id): - # Create a new pakfire instance. - _pakfire = main.Pakfire() - - # Get the build job we are working on. - job = _pakfire.jobs.get_by_id(job_id) - assert job - - # Check if the job status has changed in the meanwhile. - if not job.state in ("new", "dependency_error", "failed"): - logging.warning("Job status has already changed: %s - %s" % (job.name, job.state)) - return - - # Resolve the dependencies. - job.resolvdep() - - -managers.append(DependencyChecker) - - -class DeleteManager(Manager): - @property - def timeout(self): - return 300 - - def do(self): - self.pakfire.cleanup_files() - - -managers.append(DeleteManager) - -class SessionsManager(Manager): - """ - Cleans up sessions that are not valid anymore. - Keeps the database smaller. - """ - - @property - def timeout(self): - return 3600 - - def do(self): - self.pakfire.sessions.cleanup() - - -managers.append(SessionsManager) diff --git a/backend/repository.py b/backend/repository.py index c75e26c6..b07652c4 100644 --- a/backend/repository.py +++ b/backend/repository.py @@ -333,9 +333,9 @@ class Repository(base.Object): return _builds - def get_packages(self, arch): + def _get_packages(self, arch): if arch.name == "src": - pkgs = self.db.query("SELECT packages.id AS id FROM packages \ + pkgs = self.db.query("SELECT packages.id AS id, packages.path AS path FROM packages \ JOIN builds ON builds.pkg_id = packages.id \ JOIN repositories_builds ON builds.id = repositories_builds.build_id \ WHERE packages.arch = %s AND repositories_builds.repo_id = %s", @@ -345,7 +345,7 @@ class Repository(base.Object): noarch = self.pakfire.arches.get_by_name("noarch") assert noarch - pkgs = self.db.query("SELECT packages.id AS id FROM packages \ + pkgs = self.db.query("SELECT packages.id AS id, packages.path AS path FROM packages \ JOIN jobs_packages ON jobs_packages.pkg_id = packages.id \ JOIN jobs ON jobs_packages.job_id = jobs.id \ JOIN builds ON builds.id = jobs.build_id \ @@ -354,7 +354,19 @@ class Repository(base.Object): repositories_builds.repo_id = %s", arch.id, noarch.id, self.id) - return sorted([packages.Package(self.pakfire, p.id) for p in pkgs]) + return pkgs + + def get_packages(self, arch): + pkgs = [packages.Package(self.pakfire, p.id) for p in self._get_packages(arch)] + pkgs.sort() + + return pkgs + + def get_paths(self, arch): + paths = [p.path for p in self._get_packages(arch)] + paths.sort() + + return paths @property def packages(self): diff --git a/backend/scheduler.py b/backend/scheduler.py new file mode 100644 index 00000000..80f5fc4b --- /dev/null +++ b/backend/scheduler.py @@ -0,0 +1,83 @@ +#!/usr/bin/python + +import logging +import time +import traceback + +class Event(object): + interval = None + + priority = 0 + + def __init__(self, *arguments): + self.arguments = arguments + + self._next_start_time = 0 + + self.scheduler = None + + def __repr__(self): + if hasattr(self, "_next_start_time"): + return "<%s next_start_in=%ds>" % \ + (self.__class__.__name__, self._next_start_time - time.time()) + + return "<%s>" % self.__class__.__name__ + + def run(self, *args, **kwargs): + raise NotImplemented + + +class Scheduler(object): + def __init__(self): + self._queue = [] + + def add_event(self, event, start_time=None): + event.scheduler = self + + self._queue.append(event) + + # Set initial start time. + if start_time is None: + start_time = time.time() + + event._next_start_time = start_time + + def sort_queue(self): + self._queue.sort(key=lambda e: (e.priority, e._next_start_time)) + + def run(self): + while self._queue: + self.sort_queue() + + for event in self._queue: + # If the event has to be started some time in + # the future. + if event._next_start_time <= time.time(): + try: + logging.info("Running %s..." % event) + + event.run(*event.arguments) + + # In case the user interrupts the scheduler. + except KeyboardInterrupt: + # Stop immediately. + return + + except: + traceback.print_exc() + + finally: + # Set the next execution time if the event + # should be run again. + if event.interval: + event._next_start_time = time.time() + event.interval + + # Otherwise remove it from the queue. + else: + self._queue.remove(event) + + # Get back to outer loop and sort the queue again. + break + + # Sleep a bit. + time.sleep(1) diff --git a/manager/__init__.py b/manager/__init__.py new file mode 100644 index 00000000..d3713a4d --- /dev/null +++ b/manager/__init__.py @@ -0,0 +1,29 @@ +#!/usr/bin/python + +import base + +from bugs import BugsUpdateEvent +from builds import BuildsFailedRestartEvent, CheckBuildDependenciesEvent +from builds import CreateTestBuildsEvent +from messages import MessagesSendEvent +from repositories import RepositoriesUpdateEvent +from sessions import SessionsCleanupEvent +from sources import SourcesPullEvent +from uploads import UploadsCleanupEvent + + +# Events that do not fit anywhere else. + +class CleanupFilesEvent(base.Event): + """ + Removes all files that are not needed anymore. + (scratch builds, logs, etc.) + """ + # Run once in 5 minutes. + interval = 300 + + # Intermediate priority. + priority = 5 + + def run(self): + self.pakfire.cleanup_files() diff --git a/manager/base.py b/manager/base.py new file mode 100644 index 00000000..7d0b1713 --- /dev/null +++ b/manager/base.py @@ -0,0 +1,13 @@ +#!/usr/bin/python + +import backend.scheduler + +class Event(backend.scheduler.Event): + def __init__(self, pakfire, *args, **kwargs): + backend.scheduler.Event.__init__(self, *args, **kwargs) + + self.pakfire = pakfire + + @property + def db(self): + return self.pakfire.db diff --git a/manager/bugs.py b/manager/bugs.py new file mode 100644 index 00000000..10dcaa4a --- /dev/null +++ b/manager/bugs.py @@ -0,0 +1,39 @@ +#!/usr/bin/python + +import logging + +import base + +class BugsUpdateEvent(base.Event): + # User feedback gets a high priority. + priority = 1 + + @property + def interval(self): + return self.pakfire.settings.get_int("bugzilla_update_interval", 60) + + def run(self): + # Get up to ten updates. + query = self.db.query("SELECT * FROM builds_bugs_updates \ + WHERE error = 'N' ORDER BY time") + + # XXX CHECK IF BZ IS ACTUALLY REACHABLE AND WORKING + + for update in query: + try: + bug = self.pakfire.bugzilla.get_bug(update.bug_id) + if not bug: + logging.error("Bug #%s does not exist." % update.bug_id) + continue + + # Set the changes. + bug.set_status(update.status, update.resolution, update.comment) + + except Exception, e: + # If there was an error, we save that and go on. + self.db.execute("UPDATE builds_bugs_updates SET error = 'Y', error_msg = %s \ + WHERE id = %s", "%s" % e, update.id) + + else: + # Remove the update when it has been done successfully. + self.db.execute("DELETE FROM builds_bugs_updates WHERE id = %s", update.id) diff --git a/manager/builds.py b/manager/builds.py new file mode 100644 index 00000000..ab1c5f68 --- /dev/null +++ b/manager/builds.py @@ -0,0 +1,334 @@ +#!/usr/bin/python + +import datetime +import logging + +import base + +class BuildsFailedRestartEvent(base.Event): + # Run when idle. + priority = 5 + + @property + def interval(self): + return self.pakfire.settings.get_int("build_keepalive_interval", 900) + + def run(self): + max_tries = self.pakfire.settings.get_int("builds_restart_max_tries", 9) + + query = self.db.query("SELECT jobs.id AS id FROM jobs \ + JOIN builds ON builds.id = jobs.build_id \ + WHERE \ + jobs.type = 'build' AND \ + jobs.state = 'failed' AND \ + jobs.tries <= %s AND \ + NOT builds.state = 'broken' AND \ + jobs.time_finished < DATE_SUB(NOW(), INTERVAL 72 HOUR) \ + ORDER BY \ + CASE \ + WHEN jobs.type = 'build' THEN 0 \ + WHEN jobs.type = 'test' THEN 1 \ + END, \ + builds.priority DESC, jobs.time_created ASC", + max_tries) + + for row in query: + job = self.pakfire.jobs.get_by_id(row.id) + + # Restart the job. + job.set_state("new", log=False) + + +class CheckBuildDependenciesEvent(base.Event): + # Process them as quickly as possible, but there may be more important events. + priority = 3 + + @property + def interval(self): + return self.pakfire.settings.get_int("dependency_checker_interval", 30) + + def run(self): + query = self.db.query("SELECT id FROM jobs \ + WHERE state = 'new' OR \ + (state = 'dependency_error' AND \ + time_finished < DATE_SUB(NOW(), INTERVAL 5 MINUTE)) \ + ORDER BY time_finished LIMIT 50") + + for row in query: + e = CheckBuildDependencyEvent(self.pakfire, row.id) + self.scheduler.add_event(e) + + +class CheckBuildDependencyEvent(base.Event): + # Process them as quickly as possible, but there may be more important events. + priority = 3 + + def run(self, job_id): + # Get the build job we are working on. + job = self.pakfire.jobs.get_by_id(job_id) + if not job: + logging.debug("Job %s does not exist." % job_id) + return + + # Check if the job status has changed in the meanwhile. + if not job.state in ("new", "dependency_error", "failed"): + logging.warning("Job status has already changed: %s - %s" % (job.name, job.state)) + return + + # Resolve the dependencies. + job.resolvdep() + + +class CreateTestBuildsEvent(base.Event): + # Run this every five minutes. + interval = 300 + + # Run when the build service is idle. + priority = 10 + + @property + def test_threshold(self): + threshold_days = self.pakfire.settings.get_int("test_threshold_days", 14) + + return datetime.datetime.utcnow() - datetime.timedelta(days=threshold_days) + + def run(self): + max_queue_length = self.pakfire.settings.get_int("test_queue_limit", 10) + + # Get a list with all feasible architectures. + arches = self.pakfire.arches.get_all() + noarch = self.pakfire.arches.get_by_name("noarch") + if noarch: + arches.append(noarch) + + for arch in arches: + # Get the job queue for each architecture. + queue = self.pakfire.jobs.get_next(arches=[arch,]) + + # Skip adding new jobs if there are more too many jobs in the queue. + limit = max_queue_length - len(queue) + if limit <= 0: + logging.debug("Already too many jobs in queue of %s to create tests." % arch.name) + continue + + # Get a list of builds, with potentially need a test build. + # Randomize the output and do not return more jobs than we are + # allowed to put into the build queue. + builds = self.pakfire.builds.needs_test(self.test_threshold, + arch=arch, limit=limit) + + if not builds: + logging.debug("No builds needs a test for %s." % arch.name) + continue + + # Search for the job with the right architecture in each + # build and schedule a test job. + for build in builds: + for job in build.jobs: + if job.arch == arch: + job.schedule("test") + break + + +class DistManager(object): + process = None + + first_run = True + + def get_next_commit(self): + commits = self.pakfire.sources.get_pending_commits(limit=1) + + if not commits: + return + + return commits[0] + + @property + def timeout(self): + # If there are commits standing in line, we try to restart as soon + # as possible. + if self.get_next_commit(): + return 0 + + # Otherwise we wait at least for a minute. + return 60 + + def do(self): + if self.first_run: + self.first_run = False + + self.process = self.init_repos() + + if self.process: + # If the process is still running, we check back in a couple of + # seconds. + if self.process.is_alive(): + return 1 + + # The process has finished its work. Clear everything up and + # go on. + self.commit = self.process = None + + # Search for a new commit to proceed with. + self.commit = commit = self.get_next_commit() + + # If no commit is there, we just wait for a minute. + if not commit: + return 60 + + # Got a commit to process. + commit.state = "running" + + logging.debug("Processing commit %s: %s" % (commit.revision, commit.subject)) + + # Get the repository of this commit. + repo = backend.git.Repo(self.pakfire, commit.source_id) + + # Make sure, it is checked out. + if not repo.cloned: + repo.clone() + + # Navigate to the right revision. + repo.checkout(commit.revision) + + # Get all changed makefiles. + deleted_files = [] + updated_files = [] + + for file in repo.changed_files(commit.revision): + # Don't care about files that are not a makefile. + if not file.endswith(".%s" % MAKEFILE_EXTENSION): + continue + + if os.path.exists(file): + updated_files.append(file) + else: + deleted_files.append(file) + + self.process = self.fork(commit_id=commit.id, updated_files=updated_files, + deleted_files=deleted_files) + + return 1 + + def fork(self, source_id=None, commit_id=None, updated_files=[], deleted_files=[]): + # Create the Process object. + process = multiprocessing.Process( + target=self._process, + args=(source_id, commit_id, updated_files, deleted_files) + ) + + # The process is running in daemon mode so it will try to kill + # all child processes when exiting. + process.daemon = True + + # Start the process. + process.start() + logging.debug("Started new process pid=%s." % process.pid) + + return process + + def init_repos(self): + # Create the Process object. + process = multiprocessing.Process( + target=self._init_repos, + ) + + # The process is running in daemon mode so it will try to kill + # all child processes when exiting. + #process.daemon = True + + # Start the process. + process.start() + logging.debug("Started new process pid=%s." % process.pid) + + return process + + def _init_repos(self): + _pakfire = main.Pakfire() + sources = _pakfire.sources.get_all() + + for source in sources: + if source.revision: + continue + + repo = GitRepo(_pakfire, source.id) + if not repo.cloned: + repo.clone() + + files = repo.get_all_files() + + for file in files: + if not file.endswith(".%s" % MAKEFILE_EXTENSION): + continue + + #files = [f for f in files if f.endswith(".%s" % MAKEFILE_EXTENSION)] + + process = self.fork(source_id=source.id, updated_files=[file,], deleted_files=[]) + + while process.is_alive(): + time.sleep(1) + continue + + @staticmethod + def _process(source_id, commit_id, updated_files, deleted_files): + _pakfire = main.Pakfire() + + commit = None + source = None + + if commit_id: + commit = _pakfire.sources.get_commit_by_id(commit_id) + assert commit + + source = commit.source + + if source_id and not source: + source = _pakfire.sources.get_by_id(source_id) + + assert source + + if updated_files: + # Create a temporary directory where to put all the files + # that are generated here. + pkg_dir = tempfile.mkdtemp() + + try: + config = pakfire.config.Config(["general.conf",]) + config.parse(source.distro.get_config()) + + p = pakfire.Pakfire(mode="server", config=config) + + pkgs = [] + for file in updated_files: + try: + pkg_file = p.dist(file, pkg_dir) + pkgs.append(pkg_file) + except: + raise + + # Import all packages in one swoop. + for pkg in pkgs: + # Import the package file and create a build out of it. + builds.import_from_package(_pakfire, pkg, + distro=source.distro, commit=commit, type="release") + + except: + if commit: + commit.state = "failed" + + raise + + finally: + if os.path.exists(pkg_dir): + shutil.rmtree(pkg_dir) + + for file in deleted_files: + # Determine the name of the package. + name = os.path.basename(file) + name = name[:len(MAKEFILE_EXTENSION) + 1] + + if commit: + commit.distro.delete_package(name) + + if commit: + commit.state = "finished" diff --git a/manager/messages.py b/manager/messages.py new file mode 100644 index 00000000..ade15449 --- /dev/null +++ b/manager/messages.py @@ -0,0 +1,22 @@ +#!/usr/bin/python + +import base + +class MessagesSendEvent(base.Event): + # Emails should be sent out as quickly as possible. + priority = 0 + + @property + def interval(self): + return self.pakfire.settings.get_int("messages_interval", 10) + + def run(self): + for msg in self.pakfire.messages.get_all(): + try: + self.pakfire.messages.send_msg(msg) + + except: + continue + + # If everything was okay, we can delete the message in the database. + self.pakfire.messages.delete(msg.id) diff --git a/manager/repositories.py b/manager/repositories.py new file mode 100644 index 00000000..b6c5c9a5 --- /dev/null +++ b/manager/repositories.py @@ -0,0 +1,124 @@ +#!/usr/bin/python + +import logging +import os +import pakfire + +import base + +from backend.constants import * + +class RepositoriesUpdateEvent(base.Event): + priority = 6 + + @property + def timeout(self): + return self.settings.get_int("repository_update_interval", 600) + + def run(self): + for distro in self.pakfire.distros.get_all(): + for repo in distro.repositories: + # Skip repostories that do not need an update at all. + if not repo.needs_update(): + logging.info("Repository %s - %s is already up to date." % (distro.name, repo.name)) + continue + + e = RepositoryUpdateEvent(self.pakfire, repo.id) + self.scheduler.add_event(e) + + +class RepositoryUpdateEvent(base.Event): + # This is an important task, but it may take a while to process. + priority = 5 + + def run(self, repo_id): + repo = self.pakfire.repos.get_by_id(repo_id) + assert repo + + logging.info("Going to update repository %s..." % repo.name) + + # Update the timestamp when we started at last. + repo.updated() + + # Find out for which arches this repository is created. + arches = repo.arches + + # Add the source repository. + arches.append(self.pakfire.arches.get_by_name("src")) + + for arch in arches: + changed = False + + # Get all package paths that are to be included in this repository. + paths = repo.get_paths(arch) + + repo_path = os.path.join( + REPOS_DIR, + repo.distro.identifier, + repo.identifier, + arch.name + ) + + if not os.path.exists(repo_path): + os.makedirs(repo_path) + + source_files = [] + remove_files = [] + + for filename in os.listdir(repo_path): + path = os.path.join(repo_path, filename) + + if not os.path.isfile(path): + continue + + remove_files.append(path) + + for path in paths: + filename = os.path.basename(path) + + source_file = os.path.join(PACKAGES_DIR, path) + target_file = os.path.join(repo_path, filename) + + # Do not add duplicate files twice. + if source_file in source_files: + continue + + source_files.append(source_file) + + try: + remove_files.remove(target_file) + except ValueError: + changed = True + + if remove_files: + changed = True + + # If nothing in the repository data has changed, there + # is nothing to do. + if changed: + logging.info("The repository has updates...") + else: + logging.info("Nothing to update.") + continue + + # Find the key to sign the package. + key_id = None + if repo.key: + key_id = repo.key.fingerprint + + # Create package index. + p = pakfire.PakfireServer(arch=arch.name) + + p.repo_create(repo_path, source_files, + name="%s - %s.%s" % (repo.distro.name, repo.name, arch.name), + key_id=key_id, type=arch.build_type) + + # Remove files afterwards. + for file in remove_files: + file = os.path.join(repo_path, file) + + try: + os.remove(file) + except OSError: + logging.warning("Could not remove %s." % file) + diff --git a/manager/sessions.py b/manager/sessions.py new file mode 100644 index 00000000..283a756e --- /dev/null +++ b/manager/sessions.py @@ -0,0 +1,17 @@ +#!/usr/bin/python + +import base + +class SessionsCleanupEvent(base.Event): + """ + Cleans up sessions that are not valid anymore. + Keeps the database smaller. + """ + # Run once in an hour. + interval = 3600 + + # Rather unimportant when this runs. + priority = 10 + + def run(self): + self.pakfire.sessions.cleanup() diff --git a/manager/sources.py b/manager/sources.py new file mode 100644 index 00000000..39b0ed74 --- /dev/null +++ b/manager/sources.py @@ -0,0 +1,30 @@ +#!/usr/bin/python + +import backend.git + +import base + +class SourcesPullEvent(base.Event): + # This should run whenever possible, so the user can see his commits + # very quickly in the build service. + priority = 1 + + @property + def interval(self): + return self.pakfire.settings.get_int("source_update_interval", 60) + + def run(self): + for source in self.pakfire.sources.get_all(): + repo = backend.git.Repo(self.pakfire, source.id, mode="mirror") + + # If the repository is not yet cloned, we need to make a local + # clone to work with. + if not repo.cloned: + repo.clone() + + # Otherwise we just fetch updates. + else: + repo.fetch() + + # Import all new revisions. + repo.import_revisions() diff --git a/manager/uploads.py b/manager/uploads.py new file mode 100644 index 00000000..2cbfe9de --- /dev/null +++ b/manager/uploads.py @@ -0,0 +1,12 @@ +#!/usr/bin/python + +import base + +class UploadsCleanupEvent(base.Event): + interval = 3600 + + # Rather unimportant when this runs. + priority = 10 + + def run(self): + self.pakfire.uploads.cleanup() diff --git a/pakfire-manager b/pakfire-manager index 956f9c53..b84d1463 100644 --- a/pakfire-manager +++ b/pakfire-manager @@ -1,54 +1,39 @@ #!/usr/bin/python -import logging -import os.path -import time -import tornado.ioloop -import tornado.options +import backend.main +import backend.scheduler -import backend -from backend.managers import * +import manager +# Use tornado's logging options. +import tornado.options tornado.options.parse_command_line() -BASEDIR = os.path.dirname(__file__) - -class Daemon(object): - def __init__(self): - self._managers = [] - - self.ioloop.set_blocking_log_threshold(300) - - config_file = os.path.join(BASEDIR, "pbs.conf") - self.pakfire = backend.Pakfire(config_file=config_file) - - @property - def ioloop(self): - return tornado.ioloop.IOLoop.instance() - - def add(self, manager_cls): - logging.info("Registering new manager: %s" % manager_cls.__name__) - manager = manager_cls(self.pakfire) - self._managers.append(manager) - - def run(self): - """ - Main loop. - """ - for manager in self._managers: - manager.pc.start() - - self.ioloop.start() - - def shutdown(self): - self.ioloop.stop() - - - - -if __name__ == "__main__": - d = Daemon() - for manager in managers: - d.add(manager) - - d.run() +# main + +# Create Scheduler instance. +s = backend.scheduler.Scheduler() + +# Create Pakfire instance. +p = backend.main.Pakfire() + +events = ( + manager.BugsUpdateEvent, + manager.BuildsFailedRestartEvent, + manager.CheckBuildDependenciesEvent, + manager.CleanupFilesEvent, + manager.CreateTestBuildsEvent, + manager.MessagesSendEvent, + manager.RepositoriesUpdateEvent, + manager.SessionsCleanupEvent, + manager.SourcesPullEvent, + manager.UploadsCleanupEvent, +) + +# Add all events to the scheduler. +for e in events: + i = e(p) + s.add_event(i) + +# Run the scheduler. +s.run()