]> git.ipfire.org Git - pbs.git/commitdiff
Initial import of the new manager daemon.
authorMichael Tremer <michael.tremer@ipfire.org>
Thu, 13 Dec 2012 23:18:54 +0000 (00:18 +0100)
committerMichael Tremer <michael.tremer@ipfire.org>
Thu, 13 Dec 2012 23:18:54 +0000 (00:18 +0100)
16 files changed:
backend/builds.py
backend/git.py [new file with mode: 0644]
backend/main.py
backend/managers.py [deleted file]
backend/repository.py
backend/scheduler.py [new file with mode: 0644]
manager/__init__.py [new file with mode: 0644]
manager/base.py [new file with mode: 0644]
manager/bugs.py [new file with mode: 0644]
manager/builds.py [new file with mode: 0644]
manager/messages.py [new file with mode: 0644]
manager/repositories.py [new file with mode: 0644]
manager/sessions.py [new file with mode: 0644]
manager/sources.py [new file with mode: 0644]
manager/uploads.py [new file with mode: 0644]
pakfire-manager

index ab5ce0310b451a32fb681225fd6cf4e239239156..9d36fc0e1011f3c85a1399764db0ae55952cdfb9 100644 (file)
@@ -193,7 +193,8 @@ class Builds(base.Object):
                                        jobs.state = 'finished' AND \
                                        jobs.time_finished < %s \
                                ) \
-                       AND builds.type = 'release' AND NOT builds.state = 'broken'"
+                       AND builds.type = 'release' \
+                       AND (builds.state = 'stable' OR builds.state = 'testing')"
                args  = [arch.id, threshold, arch.id, threshold]
 
                if randomize:
@@ -1302,8 +1303,11 @@ class Jobs(base.Object):
                # Return sorted list of jobs.
                return sorted(jobs)
 
-       def get_active(self, host_id=None, uploads=True):
-               running_states = ["dispatching", "new", "pending", "running"]
+       def get_active(self, host_id=None, uploads=True, running_only=False):
+               running_states = ["dispatching", "running"]
+
+               if not running_only:
+                       running_states += ["new", "pending",]
 
                if uploads:
                        running_states.append("uploading")
@@ -2152,7 +2156,7 @@ class Job(base.Object):
 
                # Create a new pakfire instance with the configuration for
                # this build.
-               p = pakfire.Pakfire(mode="server", config=config, arch=self.arch.name)
+               p = pakfire.PakfireServer(config=config, arch=self.arch.name)
 
                # Try to solve the build dependencies.
                try:
diff --git a/backend/git.py b/backend/git.py
new file mode 100644 (file)
index 0000000..02d1de7
--- /dev/null
@@ -0,0 +1,147 @@
+#!/usr/bin/python
+
+import datetime
+import logging
+import os
+import subprocess
+
+import base
+import sources
+
+class Repo(base.Object):
+       def __init__(self, pakfire, id, mode="normal"):
+               base.Object.__init__(self, pakfire)
+
+               assert mode in ("normal", "bare", "mirror")
+
+               # Get the source object.
+               self.source = sources.Source(pakfire, id)
+               self.mode = mode
+
+       @property
+       def path(self):
+               return os.path.join("/var/cache/pakfire/git-repos", self.source.identifier, self.mode)
+
+       def git(self, cmd, path=None):
+               if not path:
+                       path = self.path
+
+               cmd = "cd %s && git %s" % (path, cmd)
+
+               logging.debug("Running command: %s" % cmd)
+
+               return subprocess.check_output(["/bin/sh", "-c", cmd])
+
+       @property
+       def cloned(self):
+               """
+                       Say if the repository is already cloned.
+               """
+               return os.path.exists(self.path)
+
+       def clone(self):
+               if self.cloned:
+                       return
+
+               path = os.path.dirname(self.path)
+               repo = os.path.basename(self.path)
+
+               # Create the repository home directory if not exists.
+               if not os.path.exists(path):
+                       os.makedirs(path)
+
+               command = ["clone"]
+               if self.mode == "bare":
+                       command.append("--bare")
+               elif self.mode == "mirror":
+                       command.append("--mirror")
+
+               command.append(self.source.url)
+               command.append(repo)
+
+               # Clone the repository.
+               try:
+                       self.git(" ".join(command), path=path)
+               except Exception:
+                       shutil.rmtree(self.path)
+                       raise
+
+       def fetch(self):
+               # Make sure, the repository was already cloned.
+               if not self.cloned:
+                       self.clone()
+
+               self.git("fetch")
+
+       def rev_list(self, revision=None):
+               if not revision:
+                       if self.source.head_revision:
+                               revision = self.source.head_revision.revision
+                       else:
+                               revision = self.source.start_revision
+
+               command = "rev-list %s..%s" % (revision, self.source.branch)
+
+               # Get all merge commits.
+               merges = self.git("%s --merges" % command).splitlines()
+
+               revisions = []
+               for commit in self.git(command).splitlines():
+                       # Check if commit is a normal commit or merge commit.
+                       merge = commit in merges
+
+                       revisions.append((commit, merge))
+
+               return [r for r in reversed(revisions)]
+
+       def import_revisions(self):
+               # Get all pending revisions.
+               revisions = self.rev_list()
+
+               for revision, merge in revisions:
+                       # Actually import the revision.
+                       self._import_revision(revision, merge)
+
+       def _import_revision(self, revision, merge):
+               logging.debug("Going to import revision %s (merge: %s)." % (revision, merge))
+
+               rev_author    = self.git("log -1 --format=\"%%an <%%ae>\" %s" % revision)
+               rev_committer = self.git("log -1 --format=\"%%cn <%%ce>\" %s" % revision)
+               rev_subject   = self.git("log -1 --format=\"%%s\" %s" % revision)
+               rev_body      = self.git("log -1 --format=\"%%b\" %s" % revision)
+               rev_date      = self.git("log -1 --format=\"%%at\" %s" % revision)
+               rev_date      = datetime.datetime.utcfromtimestamp(float(rev_date))
+
+               # Convert strings properly. No idea why I have to do that.
+               #rev_author    = rev_author.decode("latin-1").strip()
+               #rev_committer = rev_committer.decode("latin-1").strip()
+               #rev_subject   = rev_subject.decode("latin-1").strip()
+               #rev_body      = rev_body.decode("latin-1").rstrip()
+
+               # Create a new commit object in the database
+               commit = sources.Commit.create(self.pakfire, self.source, revision,
+                       rev_author, rev_committer, rev_subject, rev_body, rev_date)
+
+       def checkout(self, revision, update=False):
+               for update in (0, 1):
+                       if update:
+                               self.fetch()
+
+                       try:
+                               self.git("checkout %s" % revision)
+
+                       except subprocess.CalledProcessError:
+                               if not update:
+                                       continue
+
+                               raise
+
+       def changed_files(self, revision):
+               files = self.git("diff --name-only %s^ %s" % (revision, revision))
+
+               return [os.path.join(self.path, f) for f in files.splitlines()]
+
+       def get_all_files(self):
+               files = self.git("ls-files")
+
+               return [os.path.join(self.path, f) for f in files.splitlines()]
index be24867d3832b56894ede0d5e08f31c5e7c39b8f..e27e5cca1384dd4d6b3505ab295674762714f82c 100644 (file)
@@ -28,7 +28,7 @@ import users
 from constants import *
 
 class Pakfire(object):
-       def __init__(self, config_file):
+       def __init__(self, config_file="pbs.conf"):
                # Read configuration file.
                self.config = self.read_config(config_file)
 
diff --git a/backend/managers.py b/backend/managers.py
deleted file mode 100644 (file)
index eeeb682..0000000
+++ /dev/null
@@ -1,940 +0,0 @@
-#!/usr/bin/python
-
-import datetime
-import logging
-import multiprocessing
-import os
-import shutil
-import subprocess
-import tempfile
-import time
-import tornado.ioloop
-
-import pakfire
-import pakfire.api
-import pakfire.config
-from pakfire.constants import *
-
-import base
-import builds
-import main
-import packages
-import sources
-
-from constants import *
-
-managers = []
-
-class Manager(base.Object):
-       def __init__(self, pakfire):
-               base.Object.__init__(self, pakfire)
-
-               self.pc = tornado.ioloop.PeriodicCallback(self, self.timeout * 1000)
-
-               logging.info("%s was initialized." % self.__class__.__name__)
-
-               self()
-
-       def __call__(self):
-               logging.info("%s main method was called." % self.__class__.__name__)
-
-               timeout = self.do()
-
-               if timeout is None:
-                       timeout = self.timeout
-
-               # Update callback_time.
-               self.pc.callback_time = timeout * 1000
-               logging.debug("Next call will be in ~%.2f seconds." % timeout)
-
-       @property
-       def timeout(self):
-               """
-                       Return a new callback timeout in seconds.
-               """
-               raise NotImplementedError
-
-       def do(self):
-               raise NotImplementedError
-
-
-       # Helper functions
-
-       def wait_for_processes(self):
-               ALIVE_CHECK_INTERVAL = 0.5
-
-               logging.debug("There are %s process(es) in the queue." % len(self.processes))
-
-               # Nothing to do, if there are no processes running.
-               if not self.processes:
-                       return
-
-               # Get the currently running? process.
-               process = self.processes[0]
-
-               # If the first process is running, everything is okay and
-               # we'll have to wait.
-               if process.is_alive():
-                       return ALIVE_CHECK_INTERVAL
-
-               # If the process has not been run, it is started now.
-               if process.exitcode is None:
-                       logging.debug("Process %s is being started..." % process)
-
-                       process.start()
-                       return ALIVE_CHECK_INTERVAL
-
-               # If the process has stopped working we check why...
-               elif process.exitcode == 0:
-                       logging.debug("Process %s has successfully finished." % process)
-
-               elif process.exitcode > 0:
-                       logging.error("Process %s has exited with code: %s" % \
-                               (process, process.exitcode))
-
-               elif process.exitcode < 0:
-                       logging.error("Process %s has ended with signal %s" % \
-                               (process, process.exitcode))
-
-               # Remove process from process queue.
-               self.processes.remove(process)
-
-               # If there are still processes in the queue, we start this function
-               # again...
-               if self.processes:
-                       return self.wait_for_processes()
-
-
-class MessagesManager(Manager):
-       @property
-       def messages(self):
-               """
-                       Shortcut to messages object.
-               """
-               return self.pakfire.messages
-
-       @property
-       def timeout(self):
-               # If we have messages, we should run as soon as possible.
-               if self.messages.count:
-                       return 0
-
-               # Otherwise we sleep for "mesages_interval"
-               return self.settings.get_int("messages_interval", 10)
-
-       def do(self):
-               logging.info("Sending a bunch of messages.")
-
-               # Send up to 25 messages and return.
-               i = 0
-               for msg in self.messages.get_all(limit=25):
-                       try:
-                               self.messages.send_msg(msg)
-
-                       except Exception, e:
-                               logging.critical("There was an error sending mail: %s" % e)
-                               raise
-
-                       else:
-                               # If everything was okay, we can delete the message in the database.
-                               self.messages.delete(msg.id)
-                               i += 1
-
-               count = self.messages.count
-
-               logging.debug("Successfully sent %s message(s). %s still in queue." \
-                       % (i, count))
-
-               # If there are still mails left, we start again as soon as possible.
-               if count:
-                       return 0
-
-               return self.settings.get_int("messages_interval", 10)
-
-
-managers.append(MessagesManager)
-
-class BugsUpdateManager(Manager):
-       @property
-       def timeout(self):
-               return self.settings.get_int("bugzilla_update_interval", 60)
-
-       def do(self):
-               # Get up to ten updates.
-               query = self.db.query("SELECT * FROM builds_bugs_updates \
-                       WHERE error = 'N' ORDER BY time LIMIT 10")
-
-               # XXX CHECK IF BZ IS ACTUALLY REACHABLE AND WORKING
-
-               for update in query:
-                       try:
-                               bug = self.pakfire.bugzilla.get_bug(update.bug_id)
-                               if not bug:
-                                       logging.error("Bug #%s does not exist." % update.bug_id)
-                                       continue
-
-                               # Set the changes.
-                               bug.set_status(update.status, update.resolution, update.comment)
-
-                       except Exception, e:
-                               # If there was an error, we save that and go on.
-                               self.db.execute("UPDATE builds_bugs_updates SET error = 'Y', error_msg = %s \
-                                       WHERE id = %s", "%s" % e, update.id)
-
-                       else:
-                               # Remove the update when it has been done successfully.
-                               self.db.execute("DELETE FROM builds_bugs_updates WHERE id = %s", update.id)
-
-
-managers.append(BugsUpdateManager)     
-
-class GitRepo(base.Object):
-       def __init__(self, pakfire, id, mode="normal"):
-               base.Object.__init__(self, pakfire)
-
-               assert mode in ("normal", "bare", "mirror")
-
-               # Get the source object.
-               self.source = sources.Source(pakfire, id)
-               self.mode = mode
-
-       @property
-       def path(self):
-               return os.path.join("/var/cache/pakfire/git-repos", self.source.identifier, self.mode)
-
-       def git(self, cmd, path=None):
-               if not path:
-                       path = self.path
-
-               cmd = "cd %s && git %s" % (path, cmd)
-
-               logging.debug("Running command: %s" % cmd)
-
-               return subprocess.check_output(["/bin/sh", "-c", cmd])
-
-       @property
-       def cloned(self):
-               """
-                       Say if the repository is already cloned.
-               """
-               return os.path.exists(self.path)
-
-       def clone(self):
-               if self.cloned:
-                       return
-
-               path = os.path.dirname(self.path)
-               repo = os.path.basename(self.path)
-
-               # Create the repository home directory if not exists.
-               if not os.path.exists(path):
-                       os.makedirs(path)
-
-               command = ["clone"]
-               if self.mode == "bare":
-                       command.append("--bare")
-               elif self.mode == "mirror":
-                       command.append("--mirror")
-
-               command.append(self.source.url)
-               command.append(repo)
-
-               # Clone the repository.
-               try:
-                       self.git(" ".join(command), path=path)
-               except Exception:
-                       shutil.rmtree(self.path)
-                       raise
-
-       def fetch(self):
-               # Make sure, the repository was already cloned.
-               if not self.cloned:
-                       self.clone()
-
-               self.git("fetch")
-
-       def rev_list(self, revision=None):
-               if not revision:
-                       if self.source.head_revision:
-                               revision = self.source.head_revision.revision
-                       else:
-                               revision = self.source.start_revision
-
-               command = "rev-list %s..%s" % (revision, self.source.branch)
-
-               # Get all merge commits.
-               merges = self.git("%s --merges" % command).splitlines()
-
-               revisions = []
-               for commit in self.git(command).splitlines():
-                       # Check if commit is a normal commit or merge commit.
-                       merge = commit in merges
-
-                       revisions.append((commit, merge))
-
-               return [r for r in reversed(revisions)]
-
-       def import_revisions(self):
-               # Get all pending revisions.
-               revisions = self.rev_list()
-
-               for revision, merge in revisions:
-                       # Actually import the revision.
-                       self._import_revision(revision, merge)
-
-       def _import_revision(self, revision, merge):
-               logging.debug("Going to import revision %s (merge: %s)." % (revision, merge))
-
-               rev_author    = self.git("log -1 --format=\"%%an <%%ae>\" %s" % revision)
-               rev_committer = self.git("log -1 --format=\"%%cn <%%ce>\" %s" % revision)
-               rev_subject   = self.git("log -1 --format=\"%%s\" %s" % revision)
-               rev_body      = self.git("log -1 --format=\"%%b\" %s" % revision)
-               rev_date      = self.git("log -1 --format=\"%%at\" %s" % revision)
-               rev_date      = datetime.datetime.utcfromtimestamp(float(rev_date))
-
-               # Convert strings properly. No idea why I have to do that.
-               #rev_author    = rev_author.decode("latin-1").strip()
-               #rev_committer = rev_committer.decode("latin-1").strip()
-               #rev_subject   = rev_subject.decode("latin-1").strip()
-               #rev_body      = rev_body.decode("latin-1").rstrip()
-
-               # Create a new commit object in the database
-               commit = sources.Commit.create(self.pakfire, self.source, revision,
-                       rev_author, rev_committer, rev_subject, rev_body, rev_date)
-
-       def checkout(self, revision, update=False):
-               for update in (0, 1):
-                       if update:
-                               self.fetch()
-
-                       try:
-                               self.git("checkout %s" % revision)
-
-                       except subprocess.CalledProcessError:
-                               if not update:
-                                       continue
-
-                               raise
-
-       def changed_files(self, revision):
-               files = self.git("diff --name-only %s^ %s" % (revision, revision))
-
-               return [os.path.join(self.path, f) for f in files.splitlines()]
-
-       def get_all_files(self):
-               files = self.git("ls-files")
-
-               return [os.path.join(self.path, f) for f in files.splitlines()]
-
-
-class SourceManager(Manager):
-       @property
-       def sources(self):
-               return self.pakfire.sources
-
-       @property
-       def timeout(self):
-               return self.settings.get_int("source_update_interval", 60)
-
-       def do(self):
-               for source in self.sources.get_all():
-                       repo = GitRepo(self.pakfire, source.id, mode="mirror")
-
-                       # If the repository is not yet cloned, we need to make a local
-                       # clone to work with.
-                       if not repo.cloned:
-                               repo.clone()
-
-                               # If we have cloned a new repository, we exit to not get over
-                               # the time treshold.
-                               return 0
-
-                       # Otherwise we just fetch updates.
-                       else:
-                               repo.fetch()
-
-                       # Import all new revisions.
-                       repo.import_revisions()
-
-
-managers.append(SourceManager)
-
-class DistManager(Manager):
-       process = None
-
-       first_run = True
-
-       def get_next_commit(self):
-               commits = self.pakfire.sources.get_pending_commits(limit=1)
-
-               if not commits:
-                       return
-
-               return commits[0]
-
-       @property
-       def timeout(self):
-               # If there are commits standing in line, we try to restart as soon
-               # as possible.
-               if self.get_next_commit():
-                       return 0
-
-               # Otherwise we wait at least for a minute.
-               return 60
-
-       def do(self):
-               if self.first_run:
-                       self.first_run = False
-
-                       self.process = self.init_repos()
-
-               if self.process:
-                       # If the process is still running, we check back in a couple of
-                       # seconds.
-                       if self.process.is_alive():
-                               return 1
-
-                       # The process has finished its work. Clear everything up and
-                       # go on.
-                       self.commit = self.process = None
-
-               # Search for a new commit to proceed with.
-               self.commit = commit = self.get_next_commit()
-
-               # If no commit is there, we just wait for a minute.
-               if not commit:
-                       return 60
-
-               # Got a commit to process.
-               commit.state = "running"
-
-               logging.debug("Processing commit %s: %s" % (commit.revision, commit.subject))
-
-               # Get the repository of this commit.
-               repo = GitRepo(self.pakfire, commit.source_id)
-
-               # Make sure, it is checked out.
-               if not repo.cloned:
-                       repo.clone()
-
-               # Navigate to the right revision.
-               repo.checkout(commit.revision)
-
-               # Get all changed makefiles.
-               deleted_files = []
-               updated_files = []
-
-               for file in repo.changed_files(commit.revision):
-                       # Don't care about files that are not a makefile.
-                       if not file.endswith(".%s" % MAKEFILE_EXTENSION):
-                               continue
-
-                       if os.path.exists(file):
-                               updated_files.append(file)
-                       else:
-                               deleted_files.append(file)
-
-               self.process = self.fork(commit_id=commit.id, updated_files=updated_files,
-                       deleted_files=deleted_files)
-
-               return 1
-
-       def fork(self, source_id=None, commit_id=None, updated_files=[], deleted_files=[]):
-               # Create the Process object.
-               process = multiprocessing.Process(
-                       target=self._process,
-                       args=(source_id, commit_id, updated_files, deleted_files)
-               )
-
-               # The process is running in daemon mode so it will try to kill
-               # all child processes when exiting.
-               process.daemon = True
-
-               # Start the process.
-               process.start()
-               logging.debug("Started new process pid=%s." % process.pid)
-
-               return process
-
-       def init_repos(self):
-               # Create the Process object.
-               process = multiprocessing.Process(
-                       target=self._init_repos,
-               )
-
-               # The process is running in daemon mode so it will try to kill
-               # all child processes when exiting.
-               #process.daemon = True
-
-               # Start the process.
-               process.start()
-               logging.debug("Started new process pid=%s." % process.pid)
-
-               return process
-
-       def _init_repos(self):
-               _pakfire = main.Pakfire()
-               sources = _pakfire.sources.get_all()
-
-               for source in sources:
-                       if source.revision:
-                               continue
-
-                       repo = GitRepo(_pakfire, source.id)
-                       if not repo.cloned:
-                               repo.clone()
-
-                       files = repo.get_all_files()
-
-                       for file in files:
-                               if not file.endswith(".%s" % MAKEFILE_EXTENSION):
-                                       continue
-
-                               #files = [f for f in files if f.endswith(".%s" % MAKEFILE_EXTENSION)]
-
-                               process = self.fork(source_id=source.id, updated_files=[file,], deleted_files=[])
-
-                               while process.is_alive():
-                                       time.sleep(1)
-                                       continue
-
-       @staticmethod
-       def _process(source_id, commit_id, updated_files, deleted_files):
-               _pakfire = main.Pakfire()
-
-               commit = None
-               source = None
-
-               if commit_id:
-                       commit = _pakfire.sources.get_commit_by_id(commit_id)
-                       assert commit
-
-                       source = commit.source
-
-               if source_id and not source:
-                       source = _pakfire.sources.get_by_id(source_id)
-
-               assert source
-
-               if updated_files:
-                       # Create a temporary directory where to put all the files
-                       # that are generated here.
-                       pkg_dir = tempfile.mkdtemp()
-
-                       try:
-                               config = pakfire.config.Config(["general.conf",])
-                               config.parse(source.distro.get_config())
-
-                               p = pakfire.Pakfire(mode="server", config=config)
-
-                               pkgs = []
-                               for file in updated_files:
-                                       try:
-                                               pkg_file = p.dist(file, pkg_dir)
-                                               pkgs.append(pkg_file)
-                                       except:
-                                               raise
-
-                               # Import all packages in one swoop.
-                               for pkg in pkgs:
-                                       # Import the package file and create a build out of it.
-                                       builds.import_from_package(_pakfire, pkg,
-                                               distro=source.distro, commit=commit, type="release")
-
-                       except:
-                               if commit:
-                                       commit.state = "failed"
-
-                               raise
-
-                       finally:
-                               if os.path.exists(pkg_dir):
-                                       shutil.rmtree(pkg_dir)
-
-                       for file in deleted_files:
-                               # Determine the name of the package.
-                               name = os.path.basename(file)
-                               name = name[:len(MAKEFILE_EXTENSION) + 1]
-
-                               if commit:
-                                       commit.distro.delete_package(name)
-
-                       if commit:
-                               commit.state = "finished"
-
-
-managers.append(DistManager)
-
-class BuildsManager(Manager):
-       @property
-       def timeout(self):
-               return self.settings.get_int("build_keepalive_interval", 900)
-
-       def do(self):
-               threshold = datetime.datetime.utcnow() - datetime.timedelta(hours=72)
-
-               for job in self.pakfire.jobs.get_next_iter(type="build", max_tries=9, states=["failed"]):
-                       if job.build.state == "broken":
-                               continue
-
-                       if not job.time_finished or job.time_finished > threshold:
-                               continue
-
-                       # Restart the job.
-                       logging.info("Restarting build job: %s" % job)
-                       job.set_state("new", log=False)
-
-
-managers.append(BuildsManager)
-
-class UploadsManager(Manager):
-       @property
-       def timeout(self):
-               return self.settings.get_int("uploads_remove_interval", 3600)
-
-       def do(self):
-               self.pakfire.uploads.cleanup()
-
-
-managers.append(UploadsManager)
-
-class RepositoryManager(Manager):
-       processes = []
-
-       @property
-       def timeout(self):
-               return self.settings.get_int("repository_update_interval", 600)
-
-       def do(self):
-               for process in self.processes[:]:
-                       # If the first process is running, everything is okay and
-                       # we'll have to wait.
-                       if process.is_alive():
-                               return 0.5
-
-                       # If the process has not been run, it is started now.
-                       if process.exitcode is None:
-                               logging.debug("Process %s is being started..." % process)
-
-                               process.start()
-                               return 1
-
-                       # If the process has stopped working we check why...
-                       else:
-                               if process.exitcode == 0:
-                                       logging.debug("Process %s has successfully finished." % process)
-
-                               elif process.exitcode > 0:
-                                       logging.error("Process %s has exited with code: %s" % \
-                                               (process, process.exitcode))
-
-                               elif process.exitcode < 0:
-                                       logging.error("Process %s has ended with signal %s" % \
-                                               (process, process.exitcode))
-
-                               # Remove process from process queue.
-                               self.processes.remove(process)
-
-                               # Start the loop again if there any processes left
-                               # that need to be started.
-                               if self.processes:
-                                       continue
-
-                               # Otherwise wait some time and start from the beginning.
-                               else:
-                                       return self.settings.get_int("repository_update_interval", 600)
-
-               for distro in self.pakfire.distros.get_all():
-                       for repo in distro.repositories:
-                               # Skip repostories that do not need an update at all.
-                               if not repo.needs_update():
-                                       logging.info("Repository %s - %s is already up to date." % (distro.name, repo.name))
-                                       continue
-
-                               # Create the Process object.
-                               process = multiprocessing.Process(
-                                       target=self._process,
-                                       args=(repo.id,)
-                               )
-
-                               # Run the process in daemon mode.
-                               process.daemon = True
-
-                               # Add the process to the process queue.
-                               self.processes.append(process)
-
-               # XXX DEVEL
-               #if self.processes:
-               #       return 0
-               #else:
-               #       return
-
-               # Create dependency updater after all repositories have been
-               # updated.
-               #jobs = self.pakfire.jobs.get_next_iter(states=["new", "dependency_error", "failed",])
-
-               #for job in jobs:
-               #       process = multiprocessing.Process(
-               #               target=self._dependency_update_process,
-               #               args=(job.id,)
-               #       )
-               #       process.daemon = True
-               #       self.processes.append(process)
-
-               # Start again as soon as possible.
-               #if self.processes:
-               #       return 0
-
-       @staticmethod
-       def _process(repo_id):
-               _pakfire = main.Pakfire()
-
-               repo = _pakfire.repos.get_by_id(repo_id)
-               assert repo
-
-               logging.info("Going to update repository %s..." % repo.name)
-
-               # Update the timestamp when we started at last.
-               repo.updated()
-
-               # Find out for which arches this repository is created.
-               arches = repo.arches
-
-               # Add the source repository.
-               arches.append(_pakfire.arches.get_by_name("src"))
-
-               for arch in arches:
-                       changed = False
-
-                       # Get all packages that are to be included in this repository.
-                       pkgs = repo.get_packages(arch)
-
-                       # Log all packages.
-                       for pkg in pkgs:
-                               logging.info("  %s" % pkg)
-
-                       repo_path = os.path.join(
-                               REPOS_DIR,
-                               repo.distro.identifier,
-                               repo.identifier,
-                               arch.name
-                       )
-
-                       if not os.path.exists(repo_path):
-                               os.makedirs(repo_path)
-
-                       source_files = []
-                       remove_files = []
-
-                       for filename in os.listdir(repo_path):
-                               path = os.path.join(repo_path, filename)
-
-                               if not os.path.isfile(path):
-                                       continue
-
-                               remove_files.append(path)
-
-                       for pkg in pkgs:
-                               filename = os.path.basename(pkg.path)
-
-                               source_file = os.path.join(PACKAGES_DIR, pkg.path)
-                               target_file = os.path.join(repo_path, filename)
-
-                               # Do not add duplicate files twice.
-                               if source_file in source_files:
-                                       #logging.warning("Duplicate file detected: %s" % source_file)
-                                       continue
-
-                               source_files.append(source_file)
-
-                               try:
-                                       remove_files.remove(target_file)
-                               except ValueError:
-                                       changed = True
-
-                       if remove_files:
-                               changed = True
-
-                       # If nothing in the repository data has changed, there
-                       # is nothing to do.
-                       if changed:
-                               logging.info("The repository has updates...")
-                       else:
-                               logging.info("Nothing to update.")
-                               continue
-
-                       # Find the key to sign the package.
-                       key_id = None
-                       if repo.key:
-                               key_id = repo.key.fingerprint
-
-                       # Create package index.
-                       pakfire.api.repo_create(repo_path, source_files,
-                               name="%s - %s.%s" % (repo.distro.name, repo.name, arch.name),
-                               key_id=key_id, type=arch.build_type, mode="server")
-
-                       # Remove files afterwards.
-                       for file in remove_files:
-                               file = os.path.join(repo_path, file)
-
-                               try:
-                                       os.remove(file)
-                               except OSError:
-                                       logging.warning("Could not remove %s." % file)
-
-       @staticmethod
-       def _dependency_update_process(job_id):
-               _pakfire = main.Pakfire()
-
-               job = _pakfire.jobs.get_by_id(job_id)
-               assert job
-
-               job.resolvdep()
-
-
-managers.append(RepositoryManager)
-
-class TestManager(Manager):
-       @property
-       def timeout(self):
-               return 300
-
-       @property
-       def test_threshold(self):
-               threshold_days = self.pakfire.settings.get_int("test_threshold_days", 14)
-
-               return datetime.datetime.utcnow() - datetime.timedelta(days=threshold_days)
-
-       def do(self):
-               max_queue_length = self.pakfire.settings.get_int("test_queue_limit", 10)
-
-               # Get a list with all feasible architectures.
-               arches = self.pakfire.arches.get_all()
-               noarch = self.pakfire.arches.get_by_name("noarch")
-               if noarch:
-                       arches.append(noarch)
-
-               for arch in arches:
-                       # Get the job queue for each architecture.
-                       queue = self.pakfire.jobs.get_next(arches=[arch,])
-
-                       # Skip adding new jobs if there are more too many jobs in the queue.
-                       limit = max_queue_length - len(queue)
-                       if limit <= 0:
-                               logging.debug("Already too many jobs in queue of %s to create tests." % arch.name)
-                               continue
-
-                       # Get a list of builds, with potentially need a test build.
-                       # Randomize the output and do not return more jobs than we are
-                       # allowed to put into the build queue.
-                       builds = self.pakfire.builds.needs_test(self.test_threshold,
-                               arch=arch, limit=limit, randomize=True)
-
-                       if not builds:
-                               logging.debug("No builds needs a test for %s." % arch.name)
-                               continue
-
-                       # Search for the job with the right architecture in each
-                       # build and schedule a test job.
-                       for build in builds:
-                               for job in build.jobs:
-                                       if not job.arch == arch:
-                                               continue
-
-                                       job.schedule("test")
-                                       break
-
-
-managers.append(TestManager)
-
-
-class DependencyChecker(Manager):
-       processes = []
-
-       @property
-       def timeout(self):
-               return self.settings.get_int("dependency_checker_interval", 30)
-
-       def do(self):
-               if self.processes:
-                       return self.wait_for_processes()
-
-               return self.search_jobs()
-
-       def search_jobs(self):
-               # Find the jobs who need the update the most.
-               job_ids = []
-
-               # Get all jobs in new state, no matter how many these are.
-               query = self.db.query("SELECT id FROM jobs WHERE state = 'new'")
-               for job in query:
-                       job_ids.append(job.id)
-
-               # If there are no jobs to check, search deeper.
-               if not job_ids:
-                       query = self.db.query("SELECT id FROM jobs \
-                               WHERE state = 'dependency_error' AND time_finished < DATE_SUB(NOW(), INTERVAL 5 MINUTE) \
-                               ORDER BY time_finished LIMIT 50")
-
-                       for job in query:
-                               job_ids.append(job.id)
-
-               # Create a subprocess for every single job we have to work on.
-               for job_id in job_ids:
-                       process = multiprocessing.Process(
-                               target=self._process, args=(job_id,)
-                       )
-                       process.daemon = True
-                       self.processes.append(process)
-
-               # Start immediately again if we have running subprocesses.
-               if self.processes:              
-                       return 0
-
-       @staticmethod
-       def _process(job_id):
-               # Create a new pakfire instance.
-               _pakfire = main.Pakfire()
-
-               # Get the build job we are working on.
-               job = _pakfire.jobs.get_by_id(job_id)
-               assert job
-
-               # Check if the job status has changed in the meanwhile.
-               if not job.state in ("new", "dependency_error", "failed"):
-                       logging.warning("Job status has already changed: %s - %s" % (job.name, job.state))
-                       return
-
-               # Resolve the dependencies.
-               job.resolvdep()
-
-
-managers.append(DependencyChecker)
-
-
-class DeleteManager(Manager):
-       @property
-       def timeout(self):
-               return 300
-
-       def do(self):
-               self.pakfire.cleanup_files()
-
-
-managers.append(DeleteManager)
-
-class SessionsManager(Manager):
-       """
-               Cleans up sessions that are not valid anymore.
-               Keeps the database smaller.
-       """
-
-       @property
-       def timeout(self):
-               return 3600
-
-       def do(self):
-               self.pakfire.sessions.cleanup()
-
-
-managers.append(SessionsManager)
index c75e26c6a06754b39728b19174d96b7e8aa38822..b07652c4d08160236a42f7aecb73f130aae550db 100644 (file)
@@ -333,9 +333,9 @@ class Repository(base.Object):
 
                return _builds
 
-       def get_packages(self, arch):
+       def _get_packages(self, arch):
                if arch.name == "src":
-                       pkgs = self.db.query("SELECT packages.id AS id FROM packages \
+                       pkgs = self.db.query("SELECT packages.id AS id, packages.path AS path FROM packages \
                                JOIN builds ON builds.pkg_id = packages.id \
                                JOIN repositories_builds ON builds.id = repositories_builds.build_id \
                                WHERE packages.arch = %s AND repositories_builds.repo_id = %s",
@@ -345,7 +345,7 @@ class Repository(base.Object):
                        noarch = self.pakfire.arches.get_by_name("noarch")
                        assert noarch
 
-                       pkgs = self.db.query("SELECT packages.id AS id FROM packages \
+                       pkgs = self.db.query("SELECT packages.id AS id, packages.path AS path FROM packages \
                                JOIN jobs_packages ON jobs_packages.pkg_id = packages.id \
                                JOIN jobs ON jobs_packages.job_id = jobs.id \
                                JOIN builds ON builds.id = jobs.build_id \
@@ -354,7 +354,19 @@ class Repository(base.Object):
                                repositories_builds.repo_id = %s",
                                arch.id, noarch.id, self.id)
 
-               return sorted([packages.Package(self.pakfire, p.id) for p in pkgs])
+               return pkgs
+
+       def get_packages(self, arch):
+               pkgs =  [packages.Package(self.pakfire, p.id) for p in self._get_packages(arch)]
+               pkgs.sort()
+
+               return pkgs
+
+       def get_paths(self, arch):
+               paths = [p.path for p in self._get_packages(arch)]
+               paths.sort()
+
+               return paths
 
        @property
        def packages(self):
diff --git a/backend/scheduler.py b/backend/scheduler.py
new file mode 100644 (file)
index 0000000..80f5fc4
--- /dev/null
@@ -0,0 +1,83 @@
+#!/usr/bin/python
+
+import logging
+import time
+import traceback
+
+class Event(object):
+       interval = None
+
+       priority = 0
+
+       def __init__(self, *arguments):
+               self.arguments = arguments
+
+               self._next_start_time = 0
+
+               self.scheduler = None
+
+       def __repr__(self):
+               if hasattr(self, "_next_start_time"):
+                       return "<%s next_start_in=%ds>" % \
+                               (self.__class__.__name__, self._next_start_time - time.time())
+
+               return "<%s>" % self.__class__.__name__
+
+       def run(self, *args, **kwargs):
+               raise NotImplemented
+
+
+class Scheduler(object):
+       def __init__(self):
+               self._queue = []
+
+       def add_event(self, event, start_time=None):
+               event.scheduler = self
+
+               self._queue.append(event)
+
+               # Set initial start time.
+               if start_time is None:
+                       start_time = time.time()
+
+               event._next_start_time = start_time
+
+       def sort_queue(self):
+               self._queue.sort(key=lambda e: (e.priority, e._next_start_time))
+
+       def run(self):
+               while self._queue:
+                       self.sort_queue()
+
+                       for event in self._queue:
+                               # If the event has to be started some time in
+                               # the future.
+                               if event._next_start_time <= time.time():
+                                       try:
+                                               logging.info("Running %s..." % event)
+
+                                               event.run(*event.arguments)
+
+                                       # In case the user interrupts the scheduler.
+                                       except KeyboardInterrupt:
+                                               # Stop immediately.
+                                               return
+
+                                       except:
+                                               traceback.print_exc()
+
+                                       finally:
+                                               # Set the next execution time if the event
+                                               # should be run again.
+                                               if event.interval:
+                                                       event._next_start_time = time.time() + event.interval
+
+                                               # Otherwise remove it from the queue.
+                                               else:
+                                                       self._queue.remove(event)
+
+                                       # Get back to outer loop and sort the queue again.
+                                       break
+
+                       # Sleep a bit.
+                       time.sleep(1)
diff --git a/manager/__init__.py b/manager/__init__.py
new file mode 100644 (file)
index 0000000..d3713a4
--- /dev/null
@@ -0,0 +1,29 @@
+#!/usr/bin/python
+
+import base
+
+from bugs         import BugsUpdateEvent
+from builds       import BuildsFailedRestartEvent, CheckBuildDependenciesEvent
+from builds       import CreateTestBuildsEvent
+from messages     import MessagesSendEvent
+from repositories import RepositoriesUpdateEvent
+from sessions     import SessionsCleanupEvent
+from sources      import SourcesPullEvent
+from uploads      import UploadsCleanupEvent
+
+
+# Events that do not fit anywhere else.
+
+class CleanupFilesEvent(base.Event):
+       """
+               Removes all files that are not needed anymore.
+               (scratch builds, logs, etc.)
+       """
+       # Run once in 5 minutes.
+       interval = 300
+
+       # Intermediate priority.
+       priority = 5
+
+       def run(self):
+               self.pakfire.cleanup_files()
diff --git a/manager/base.py b/manager/base.py
new file mode 100644 (file)
index 0000000..7d0b171
--- /dev/null
@@ -0,0 +1,13 @@
+#!/usr/bin/python
+
+import backend.scheduler
+
+class Event(backend.scheduler.Event):
+       def __init__(self, pakfire, *args, **kwargs):
+               backend.scheduler.Event.__init__(self, *args, **kwargs)
+
+               self.pakfire = pakfire
+
+       @property
+       def db(self):
+               return self.pakfire.db
diff --git a/manager/bugs.py b/manager/bugs.py
new file mode 100644 (file)
index 0000000..10dcaa4
--- /dev/null
@@ -0,0 +1,39 @@
+#!/usr/bin/python
+
+import logging
+
+import base
+
+class BugsUpdateEvent(base.Event):
+       # User feedback gets a high priority.
+       priority = 1
+
+       @property
+       def interval(self):
+               return self.pakfire.settings.get_int("bugzilla_update_interval", 60)
+
+       def run(self):
+               # Get up to ten updates.
+               query = self.db.query("SELECT * FROM builds_bugs_updates \
+                       WHERE error = 'N' ORDER BY time")
+
+               # XXX CHECK IF BZ IS ACTUALLY REACHABLE AND WORKING
+
+               for update in query:
+                       try:
+                               bug = self.pakfire.bugzilla.get_bug(update.bug_id)
+                               if not bug:
+                                       logging.error("Bug #%s does not exist." % update.bug_id)
+                                       continue
+
+                               # Set the changes.
+                               bug.set_status(update.status, update.resolution, update.comment)
+
+                       except Exception, e:
+                               # If there was an error, we save that and go on.
+                               self.db.execute("UPDATE builds_bugs_updates SET error = 'Y', error_msg = %s \
+                                       WHERE id = %s", "%s" % e, update.id)
+
+                       else:
+                               # Remove the update when it has been done successfully.
+                               self.db.execute("DELETE FROM builds_bugs_updates WHERE id = %s", update.id)
diff --git a/manager/builds.py b/manager/builds.py
new file mode 100644 (file)
index 0000000..ab1c5f6
--- /dev/null
@@ -0,0 +1,334 @@
+#!/usr/bin/python
+
+import datetime
+import logging
+
+import base
+
+class BuildsFailedRestartEvent(base.Event):
+       # Run when idle.
+       priority = 5
+
+       @property
+       def interval(self):
+               return self.pakfire.settings.get_int("build_keepalive_interval", 900)
+
+       def run(self):
+               max_tries = self.pakfire.settings.get_int("builds_restart_max_tries", 9)
+
+               query = self.db.query("SELECT jobs.id AS id FROM jobs \
+                       JOIN builds ON builds.id = jobs.build_id \
+                       WHERE \
+                               jobs.type = 'build' AND \
+                               jobs.state = 'failed' AND \
+                               jobs.tries <= %s AND \
+                               NOT builds.state = 'broken' AND \
+                               jobs.time_finished < DATE_SUB(NOW(), INTERVAL 72 HOUR) \
+                       ORDER BY \
+                               CASE \
+                                       WHEN jobs.type = 'build' THEN 0 \
+                                       WHEN jobs.type = 'test'  THEN 1 \
+                               END, \
+                               builds.priority DESC, jobs.time_created ASC",
+                       max_tries)
+
+               for row in query:
+                       job = self.pakfire.jobs.get_by_id(row.id)
+
+                       # Restart the job.
+                       job.set_state("new", log=False)
+
+
+class CheckBuildDependenciesEvent(base.Event):
+       # Process them as quickly as possible, but there may be more important events.
+       priority = 3
+
+       @property
+       def interval(self):
+               return self.pakfire.settings.get_int("dependency_checker_interval", 30)
+
+       def run(self):
+               query = self.db.query("SELECT id FROM jobs \
+                       WHERE state = 'new' OR \
+                               (state = 'dependency_error' AND \
+                               time_finished < DATE_SUB(NOW(), INTERVAL 5 MINUTE)) \
+                       ORDER BY time_finished LIMIT 50")
+
+               for row in query:
+                       e = CheckBuildDependencyEvent(self.pakfire, row.id)
+                       self.scheduler.add_event(e)
+
+
+class CheckBuildDependencyEvent(base.Event):
+       # Process them as quickly as possible, but there may be more important events.
+       priority = 3
+
+       def run(self, job_id):
+               # Get the build job we are working on.
+               job = self.pakfire.jobs.get_by_id(job_id)
+               if not job:
+                       logging.debug("Job %s does not exist." % job_id)
+                       return
+
+               # Check if the job status has changed in the meanwhile.
+               if not job.state in ("new", "dependency_error", "failed"):
+                       logging.warning("Job status has already changed: %s - %s" % (job.name, job.state))
+                       return
+
+               # Resolve the dependencies.
+               job.resolvdep()
+
+
+class CreateTestBuildsEvent(base.Event):
+       # Run this every five minutes.
+       interval = 300
+
+       # Run when the build service is idle.
+       priority = 10
+
+       @property
+       def test_threshold(self):
+               threshold_days = self.pakfire.settings.get_int("test_threshold_days", 14)
+
+               return datetime.datetime.utcnow() - datetime.timedelta(days=threshold_days)
+
+       def run(self):
+               max_queue_length = self.pakfire.settings.get_int("test_queue_limit", 10)
+
+               # Get a list with all feasible architectures.
+               arches = self.pakfire.arches.get_all()
+               noarch = self.pakfire.arches.get_by_name("noarch")
+               if noarch:
+                       arches.append(noarch)
+
+               for arch in arches:
+                       # Get the job queue for each architecture.
+                       queue = self.pakfire.jobs.get_next(arches=[arch,])
+
+                       # Skip adding new jobs if there are more too many jobs in the queue.
+                       limit = max_queue_length - len(queue)
+                       if limit <= 0:
+                               logging.debug("Already too many jobs in queue of %s to create tests." % arch.name)
+                               continue
+
+                       # Get a list of builds, with potentially need a test build.
+                       # Randomize the output and do not return more jobs than we are
+                       # allowed to put into the build queue.
+                       builds = self.pakfire.builds.needs_test(self.test_threshold,
+                               arch=arch, limit=limit)
+
+                       if not builds:
+                               logging.debug("No builds needs a test for %s." % arch.name)
+                               continue
+
+                       # Search for the job with the right architecture in each
+                       # build and schedule a test job.
+                       for build in builds:
+                               for job in build.jobs:
+                                       if job.arch == arch:
+                                               job.schedule("test")
+                                               break
+
+
+class DistManager(object):
+       process = None
+
+       first_run = True
+
+       def get_next_commit(self):
+               commits = self.pakfire.sources.get_pending_commits(limit=1)
+
+               if not commits:
+                       return
+
+               return commits[0]
+
+       @property
+       def timeout(self):
+               # If there are commits standing in line, we try to restart as soon
+               # as possible.
+               if self.get_next_commit():
+                       return 0
+
+               # Otherwise we wait at least for a minute.
+               return 60
+
+       def do(self):
+               if self.first_run:
+                       self.first_run = False
+
+                       self.process = self.init_repos()
+
+               if self.process:
+                       # If the process is still running, we check back in a couple of
+                       # seconds.
+                       if self.process.is_alive():
+                               return 1
+
+                       # The process has finished its work. Clear everything up and
+                       # go on.
+                       self.commit = self.process = None
+
+               # Search for a new commit to proceed with.
+               self.commit = commit = self.get_next_commit()
+
+               # If no commit is there, we just wait for a minute.
+               if not commit:
+                       return 60
+
+               # Got a commit to process.
+               commit.state = "running"
+
+               logging.debug("Processing commit %s: %s" % (commit.revision, commit.subject))
+
+               # Get the repository of this commit.
+               repo = backend.git.Repo(self.pakfire, commit.source_id)
+
+               # Make sure, it is checked out.
+               if not repo.cloned:
+                       repo.clone()
+
+               # Navigate to the right revision.
+               repo.checkout(commit.revision)
+
+               # Get all changed makefiles.
+               deleted_files = []
+               updated_files = []
+
+               for file in repo.changed_files(commit.revision):
+                       # Don't care about files that are not a makefile.
+                       if not file.endswith(".%s" % MAKEFILE_EXTENSION):
+                               continue
+
+                       if os.path.exists(file):
+                               updated_files.append(file)
+                       else:
+                               deleted_files.append(file)
+
+               self.process = self.fork(commit_id=commit.id, updated_files=updated_files,
+                       deleted_files=deleted_files)
+
+               return 1
+
+       def fork(self, source_id=None, commit_id=None, updated_files=[], deleted_files=[]):
+               # Create the Process object.
+               process = multiprocessing.Process(
+                       target=self._process,
+                       args=(source_id, commit_id, updated_files, deleted_files)
+               )
+
+               # The process is running in daemon mode so it will try to kill
+               # all child processes when exiting.
+               process.daemon = True
+
+               # Start the process.
+               process.start()
+               logging.debug("Started new process pid=%s." % process.pid)
+
+               return process
+
+       def init_repos(self):
+               # Create the Process object.
+               process = multiprocessing.Process(
+                       target=self._init_repos,
+               )
+
+               # The process is running in daemon mode so it will try to kill
+               # all child processes when exiting.
+               #process.daemon = True
+
+               # Start the process.
+               process.start()
+               logging.debug("Started new process pid=%s." % process.pid)
+
+               return process
+
+       def _init_repos(self):
+               _pakfire = main.Pakfire()
+               sources = _pakfire.sources.get_all()
+
+               for source in sources:
+                       if source.revision:
+                               continue
+
+                       repo = GitRepo(_pakfire, source.id)
+                       if not repo.cloned:
+                               repo.clone()
+
+                       files = repo.get_all_files()
+
+                       for file in files:
+                               if not file.endswith(".%s" % MAKEFILE_EXTENSION):
+                                       continue
+
+                               #files = [f for f in files if f.endswith(".%s" % MAKEFILE_EXTENSION)]
+
+                               process = self.fork(source_id=source.id, updated_files=[file,], deleted_files=[])
+
+                               while process.is_alive():
+                                       time.sleep(1)
+                                       continue
+
+       @staticmethod
+       def _process(source_id, commit_id, updated_files, deleted_files):
+               _pakfire = main.Pakfire()
+
+               commit = None
+               source = None
+
+               if commit_id:
+                       commit = _pakfire.sources.get_commit_by_id(commit_id)
+                       assert commit
+
+                       source = commit.source
+
+               if source_id and not source:
+                       source = _pakfire.sources.get_by_id(source_id)
+
+               assert source
+
+               if updated_files:
+                       # Create a temporary directory where to put all the files
+                       # that are generated here.
+                       pkg_dir = tempfile.mkdtemp()
+
+                       try:
+                               config = pakfire.config.Config(["general.conf",])
+                               config.parse(source.distro.get_config())
+
+                               p = pakfire.Pakfire(mode="server", config=config)
+
+                               pkgs = []
+                               for file in updated_files:
+                                       try:
+                                               pkg_file = p.dist(file, pkg_dir)
+                                               pkgs.append(pkg_file)
+                                       except:
+                                               raise
+
+                               # Import all packages in one swoop.
+                               for pkg in pkgs:
+                                       # Import the package file and create a build out of it.
+                                       builds.import_from_package(_pakfire, pkg,
+                                               distro=source.distro, commit=commit, type="release")
+
+                       except:
+                               if commit:
+                                       commit.state = "failed"
+
+                               raise
+
+                       finally:
+                               if os.path.exists(pkg_dir):
+                                       shutil.rmtree(pkg_dir)
+
+                       for file in deleted_files:
+                               # Determine the name of the package.
+                               name = os.path.basename(file)
+                               name = name[:len(MAKEFILE_EXTENSION) + 1]
+
+                               if commit:
+                                       commit.distro.delete_package(name)
+
+                       if commit:
+                               commit.state = "finished"
diff --git a/manager/messages.py b/manager/messages.py
new file mode 100644 (file)
index 0000000..ade1544
--- /dev/null
@@ -0,0 +1,22 @@
+#!/usr/bin/python
+
+import base
+
+class MessagesSendEvent(base.Event):
+       # Emails should be sent out as quickly as possible.
+       priority = 0
+
+       @property
+       def interval(self):
+               return self.pakfire.settings.get_int("messages_interval", 10)
+
+       def run(self):
+               for msg in self.pakfire.messages.get_all():
+                       try:
+                               self.pakfire.messages.send_msg(msg)
+
+                       except:
+                               continue
+
+                       # If everything was okay, we can delete the message in the database.
+                       self.pakfire.messages.delete(msg.id)
diff --git a/manager/repositories.py b/manager/repositories.py
new file mode 100644 (file)
index 0000000..b6c5c9a
--- /dev/null
@@ -0,0 +1,124 @@
+#!/usr/bin/python
+
+import logging
+import os
+import pakfire
+
+import base
+
+from backend.constants import *
+
+class RepositoriesUpdateEvent(base.Event):
+       priority = 6
+
+       @property
+       def timeout(self):
+               return self.settings.get_int("repository_update_interval", 600)
+
+       def run(self):
+               for distro in self.pakfire.distros.get_all():
+                       for repo in distro.repositories:
+                               # Skip repostories that do not need an update at all.
+                               if not repo.needs_update():
+                                       logging.info("Repository %s - %s is already up to date." % (distro.name, repo.name))
+                                       continue
+
+                               e = RepositoryUpdateEvent(self.pakfire, repo.id)
+                               self.scheduler.add_event(e)
+
+
+class RepositoryUpdateEvent(base.Event):
+       # This is an important task, but it may take a while to process.
+       priority = 5
+
+       def run(self, repo_id):
+               repo = self.pakfire.repos.get_by_id(repo_id)
+               assert repo
+
+               logging.info("Going to update repository %s..." % repo.name)
+
+               # Update the timestamp when we started at last.
+               repo.updated()
+
+               # Find out for which arches this repository is created.
+               arches = repo.arches
+
+               # Add the source repository.
+               arches.append(self.pakfire.arches.get_by_name("src"))
+
+               for arch in arches:
+                       changed = False
+
+                       # Get all package paths that are to be included in this repository.
+                       paths = repo.get_paths(arch)
+
+                       repo_path = os.path.join(
+                               REPOS_DIR,
+                               repo.distro.identifier,
+                               repo.identifier,
+                               arch.name
+                       )
+
+                       if not os.path.exists(repo_path):
+                               os.makedirs(repo_path)
+
+                       source_files = []
+                       remove_files = []
+
+                       for filename in os.listdir(repo_path):
+                               path = os.path.join(repo_path, filename)
+
+                               if not os.path.isfile(path):
+                                       continue
+
+                               remove_files.append(path)
+
+                       for path in paths:
+                               filename = os.path.basename(path)
+
+                               source_file = os.path.join(PACKAGES_DIR, path)
+                               target_file = os.path.join(repo_path, filename)
+
+                               # Do not add duplicate files twice.
+                               if source_file in source_files:
+                                       continue
+
+                               source_files.append(source_file)
+
+                               try:
+                                       remove_files.remove(target_file)
+                               except ValueError:
+                                       changed = True
+
+                       if remove_files:
+                               changed = True
+
+                       # If nothing in the repository data has changed, there
+                       # is nothing to do.
+                       if changed:
+                               logging.info("The repository has updates...")
+                       else:
+                               logging.info("Nothing to update.")
+                               continue
+
+                       # Find the key to sign the package.
+                       key_id = None
+                       if repo.key:
+                               key_id = repo.key.fingerprint
+
+                       # Create package index.
+                       p = pakfire.PakfireServer(arch=arch.name)
+
+                       p.repo_create(repo_path, source_files,
+                               name="%s - %s.%s" % (repo.distro.name, repo.name, arch.name),
+                               key_id=key_id, type=arch.build_type)
+
+                       # Remove files afterwards.
+                       for file in remove_files:
+                               file = os.path.join(repo_path, file)
+
+                               try:
+                                       os.remove(file)
+                               except OSError:
+                                       logging.warning("Could not remove %s." % file)
+
diff --git a/manager/sessions.py b/manager/sessions.py
new file mode 100644 (file)
index 0000000..283a756
--- /dev/null
@@ -0,0 +1,17 @@
+#!/usr/bin/python
+
+import base
+
+class SessionsCleanupEvent(base.Event):
+       """
+               Cleans up sessions that are not valid anymore.
+               Keeps the database smaller.
+       """
+       # Run once in an hour.
+       interval = 3600
+
+       # Rather unimportant when this runs.
+       priority = 10
+
+       def run(self):
+               self.pakfire.sessions.cleanup()
diff --git a/manager/sources.py b/manager/sources.py
new file mode 100644 (file)
index 0000000..39b0ed7
--- /dev/null
@@ -0,0 +1,30 @@
+#!/usr/bin/python
+
+import backend.git
+
+import base
+
+class SourcesPullEvent(base.Event):
+       # This should run whenever possible, so the user can see his commits
+       # very quickly in the build service.
+       priority = 1
+
+       @property
+       def interval(self):
+               return self.pakfire.settings.get_int("source_update_interval", 60)
+
+       def run(self):
+               for source in self.pakfire.sources.get_all():
+                       repo = backend.git.Repo(self.pakfire, source.id, mode="mirror")
+
+                       # If the repository is not yet cloned, we need to make a local
+                       # clone to work with.
+                       if not repo.cloned:
+                               repo.clone()
+
+                       # Otherwise we just fetch updates.
+                       else:
+                               repo.fetch()
+
+                       # Import all new revisions.
+                       repo.import_revisions()
diff --git a/manager/uploads.py b/manager/uploads.py
new file mode 100644 (file)
index 0000000..2cbfe9d
--- /dev/null
@@ -0,0 +1,12 @@
+#!/usr/bin/python
+
+import base
+
+class UploadsCleanupEvent(base.Event):
+       interval = 3600
+
+       # Rather unimportant when this runs.
+       priority = 10
+
+       def run(self):
+               self.pakfire.uploads.cleanup()
index 956f9c53a804070aed3fa92edab3de1a6a1eaff9..b84d14634efbab59ab72ce4f0e6dc6dde78ea508 100644 (file)
@@ -1,54 +1,39 @@
 #!/usr/bin/python
 
-import logging
-import os.path
-import time
-import tornado.ioloop
-import tornado.options
+import backend.main
+import backend.scheduler
 
-import backend
-from backend.managers import *
+import manager
 
+# Use tornado's logging options.
+import tornado.options
 tornado.options.parse_command_line()
 
-BASEDIR = os.path.dirname(__file__)
-
-class Daemon(object):
-       def __init__(self):
-               self._managers = []
-
-               self.ioloop.set_blocking_log_threshold(300)
-
-               config_file = os.path.join(BASEDIR, "pbs.conf")
-               self.pakfire = backend.Pakfire(config_file=config_file)
-
-       @property
-       def ioloop(self):
-               return tornado.ioloop.IOLoop.instance()
-
-       def add(self, manager_cls):
-               logging.info("Registering new manager: %s" % manager_cls.__name__)
-               manager = manager_cls(self.pakfire)
-               self._managers.append(manager)
-
-       def run(self):
-               """
-                       Main loop.
-               """
-               for manager in self._managers:
-                       manager.pc.start()
-
-               self.ioloop.start()
-
-       def shutdown(self):
-               self.ioloop.stop()
-
-
-
-
-if __name__ == "__main__":
-       d = Daemon()
-       for manager in managers:
-               d.add(manager)
-
-       d.run()
+# main
+
+# Create Scheduler instance.
+s = backend.scheduler.Scheduler()
+
+# Create Pakfire instance.
+p = backend.main.Pakfire()
+
+events = (
+       manager.BugsUpdateEvent,
+       manager.BuildsFailedRestartEvent,
+       manager.CheckBuildDependenciesEvent,
+       manager.CleanupFilesEvent,
+       manager.CreateTestBuildsEvent,
+       manager.MessagesSendEvent,
+       manager.RepositoriesUpdateEvent,
+       manager.SessionsCleanupEvent,
+       manager.SourcesPullEvent,
+       manager.UploadsCleanupEvent,
+)
+
+# Add all events to the scheduler.
+for e in events:
+       i = e(p)
+       s.add_event(i)
+
+# Run the scheduler.
+s.run()