]> git.ipfire.org Git - pakfire.git/commitdiff
Update pakfire-daemon:
authorMichael Tremer <michael.tremer@ipfire.org>
Thu, 28 Feb 2013 11:35:33 +0000 (12:35 +0100)
committerMichael Tremer <michael.tremer@ipfire.org>
Thu, 28 Feb 2013 11:35:33 +0000 (12:35 +0100)
Build some sort of preforked worker model like Apache does.

The XMLRPC backend has been replaced by a proper API for
which urlgrabber is used.

14 files changed:
Makeconfig
python/pakfire/cli.py
python/pakfire/client.py [new file with mode: 0644]
python/pakfire/client/__init__.py [deleted file]
python/pakfire/client/base.py [deleted file]
python/pakfire/client/builder.py [deleted file]
python/pakfire/client/transport.py [deleted file]
python/pakfire/config.py
python/pakfire/daemon.py [new file with mode: 0644]
python/pakfire/distro.py
python/pakfire/downloader.py
python/pakfire/errors.py
python/pakfire/system.py
python/pakfire/transport.py [new file with mode: 0644]

index e717238f696a2c696ee160f9248e0d92918f0fc2..8354a692757f8f01e5ab76b7a18c2a18e8a506b5 100644 (file)
@@ -33,7 +33,7 @@ endif
 
 PYTHON_CC      = $(CC) -pthread -fPIC
 PYTHON_CFLAGS  = $(shell python-config --cflags)
-PYTHON_MODULES = pakfire pakfire/client pakfire/packages pakfire/repository
+PYTHON_MODULES = pakfire pakfire/packages pakfire/repository
 ifeq "$(DEBIAN)" "1"
        PYTHON_DIR = $(LIBDIR)/python$(PYTHON_VERSION)/dist-packages
 else
index 615aacdba4842d3c7d7868a51cffb5c3dc81ea6b..232aad87329a1e871da8297a6f47b1647cd5c4a9 100644 (file)
@@ -29,6 +29,7 @@ import tempfile
 import base
 import client
 import config
+import daemon
 import logger
 import packages
 import repository
@@ -879,15 +880,11 @@ class CliClient(Cli):
                        "test"        : self.handle_test,
                }
 
-               # Read configuration for the pakfire client.
+               # Read configuration.
                self.config = config.ConfigClient()
 
                # Create connection to pakfire hub.
-               self.client = client.PakfireUserClient(
-                       self.config.get("client", "server"),
-                       self.config.get("client", "username"),
-                       self.config.get("client", "password"),
-               )
+               self.client = client.PakfireClient(self.config)
 
        @property
        def pakfire_args(self):
@@ -989,21 +986,23 @@ class CliClient(Cli):
 
                        # Format arches.
                        if self.args.arch:
-                               arches = self.args.arch.replace(",", " ")
+                               arches = self.args.arch.split(",")
                        else:
                                arches = None
 
                        # Create a new build on the server.
-                       build = self.client.build_create(package, arches=arches)
-
-                       # XXX Print the resulting build.
-                       print build
+                       build_id = self.client.build_create(package, build_type="scratch",
+                               arches=arches)
 
                finally:
                        # Cleanup the temporary directory and all files.
                        if os.path.exists(temp_dir):
                                shutil.rmtree(temp_dir, ignore_errors=True)
 
+               # Monitor the build.
+               if build_id:
+                       self.watch_build(build_id)
+
        def handle_info(self):
                ret = []
 
@@ -1183,6 +1182,11 @@ class CliClient(Cli):
                res = self.client.test_code(error_code)
                print _("Reponse from the server: %s") % res
 
+       def watch_build(self, build_id):
+               print self.client.build_get(build_id)
+               # XXX TODO
+               print build_id
+
 
 class CliDaemon(Cli):
        def __init__(self):
@@ -1200,10 +1204,11 @@ class CliDaemon(Cli):
                        Runs the pakfire daemon with provided settings.
                """
                # Read the configuration file for the daemon.
-               conf = config.ConfigDaemon()
+               self.config = config.ConfigDaemon()
+               logger.setup_logging(self.config)
 
                # Create daemon instance.
-               d = client.PakfireDaemon()
+               d = daemon.PakfireDaemon(self.config)
                try:
                        d.run()
 
diff --git a/python/pakfire/client.py b/python/pakfire/client.py
new file mode 100644 (file)
index 0000000..c1f7ba6
--- /dev/null
@@ -0,0 +1,83 @@
+#!/usr/bin/python
+###############################################################################
+#                                                                             #
+# Pakfire - The IPFire package management system                              #
+# Copyright (C) 2013 Pakfire development team                                 #
+#                                                                             #
+# This program is free software: you can redistribute it and/or modify        #
+# it under the terms of the GNU General Public License as published by        #
+# the Free Software Foundation, either version 3 of the License, or           #
+# (at your option) any later version.                                         #
+#                                                                             #
+# This program is distributed in the hope that it will be useful,             #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of              #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
+# GNU General Public License for more details.                                #
+#                                                                             #
+# You should have received a copy of the GNU General Public License           #
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
+#                                                                             #
+###############################################################################
+
+import transport
+
+from pakfire.constants import *
+from pakfire.i18n import _
+
+import logging
+log = logging.getLogger("pakfire.client")
+
+class PakfireClient(object):
+       def __init__(self, config):
+               self.config = config
+
+               # Create connection to the hub.
+               self.transport = transport.PakfireHubTransport(self.config)
+
+       def build_create(self, *args, **kwargs):
+               return self.transport.build_create(*args, **kwargs)
+
+       def build_get(self, *args, **kwargs):
+               return self.transport.build_get(*args, **kwargs)
+
+       def job_get(self, *args, **kwargs):
+               return self.transport.job_get(*args, **kwargs)
+
+       def package_get(self, *args, **kwargs):
+               return self.transport.package_get(*args, **kwargs)
+
+# XXX OLD CODE
+
+class PakfireUserClient(PakfireClient):
+       type = "user"
+
+       def check_auth(self):
+               """
+                       Check if the user was successfully authenticated.
+               """
+               return self.conn.check_auth()
+
+       def get_user_profile(self):
+               """
+                       Get information about the user profile.
+               """
+               return self.conn.get_user_profile()
+
+       def get_builds(self, type=None, limit=10, offset=0):
+               return self.conn.get_builds(type=type, limit=limit, offset=offset)
+
+       def get_build(self, build_id):
+               return self.conn.get_build(build_id)
+
+       def get_builder(self, builder_id):
+               return self.conn.get_builder(builder_id)
+
+       def get_job(self, job_id):
+               return self.conn.get_job(job_id)
+
+       def get_latest_jobs(self):
+               return self.conn.get_latest_jobs()
+
+       def get_active_jobs(self):
+               return self.conn.get_active_jobs()
+
diff --git a/python/pakfire/client/__init__.py b/python/pakfire/client/__init__.py
deleted file mode 100644 (file)
index 64adfd0..0000000
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/usr/bin/python
-
-from base import PakfireUserClient, PakfireBuilderClient
-from builder import PakfireDaemon, ClientBuilder
diff --git a/python/pakfire/client/base.py b/python/pakfire/client/base.py
deleted file mode 100644 (file)
index cd661ca..0000000
+++ /dev/null
@@ -1,249 +0,0 @@
-#!/usr/bin/python
-
-from __future__ import division
-
-import os
-import socket
-import urlparse
-import xmlrpclib
-
-import pakfire.util
-import pakfire.packages as packages
-from pakfire.system import system
-
-# Local modules.
-import transport
-
-from pakfire.constants import *
-from pakfire.i18n import _
-
-import logging
-log = logging.getLogger("pakfire.client")
-
-class PakfireClient(object):
-       type = None
-
-       def __init__(self, server, username, password):
-               self.url = self._join_url(server, username, password)
-
-               # Create a secure XMLRPC connection to the server.
-               self.conn = transport.Connection(self.url)
-
-       def _join_url(self, server, username, password):
-               """
-                       Construct a right URL out of the given
-                       server, username and password.
-
-                       Basicly this just adds the credentials
-                       to the URL.
-               """
-               assert self.type
-
-               # Parse the given URL.
-               url = urlparse.urlparse(server)
-               assert url.scheme in ("http", "https")
-
-               # Build new URL.
-               ret = "%s://" % url.scheme
-
-               # Add credentials if provided.
-               if username and password:
-                       ret += "%s:%s@" % (username, password)
-
-               # Add host and path components.
-               ret += "/".join((url.netloc, self.type))
-
-               return ret
-
-       ### Misc. actions
-
-       def noop(self):
-               """
-                       No operation. Just to check if the connection is
-                       working. Returns a random number.
-               """
-               return self.conn.noop()
-
-       def test_code(self, error_code):
-               assert error_code >= 100 and error_code <= 999
-
-               return self.conn.test_code(error_code)
-
-       def get_my_address(self):
-               """
-                       Get my own address (as seen by the hub).
-               """
-               return self.conn.get_my_address()
-
-       def get_hub_status(self):
-               """
-                       Get some status information about the hub.
-               """
-               return self.conn.get_hub_status()
-
-
-class BuildMixin(object):
-       ### Build actions
-
-       def build_create(self, filename, arches=None, distro=None):
-               """
-                       Create a new build on the hub.
-               """
-
-               # Upload the source file to the server.
-               upload_id = self._upload_file(filename)
-
-               # Then create the build.
-               build = self.conn.build_create(upload_id, distro, arches)
-
-               print build
-
-       def _upload_file(self, filename):
-               # Get the hash of the file.
-               hash = pakfire.util.calc_hash1(filename)
-
-               # Get the size of the file.
-               size = os.path.getsize(filename)
-
-               # Get an upload ID from the server.
-               upload_id = self.conn.upload_create(os.path.basename(filename),
-                       size, hash)
-
-               # Make a nice progressbar.
-               pb = pakfire.util.make_progress(os.path.basename(filename), size, speed=True, eta=True)
-
-               try:
-                       # Calculate the number of chunks.
-                       chunks = (size // CHUNK_SIZE) + 1
-                       transferred = 0
-
-                       # Cut the file in pieces and upload them one after another.
-                       with open(filename) as f:
-                               chunk = 0
-                               while True:
-                                       data = f.read(CHUNK_SIZE)
-                                       if not data:
-                                               break
-
-                                       chunk += 1
-                                       if pb:
-                                               transferred += len(data)
-                                               pb.update(transferred)
-
-                                       data = xmlrpclib.Binary(data)
-                                       self.conn.upload_chunk(upload_id, data)
-
-                       # Tell the server, that we finished the upload.
-                       ret = self.conn.upload_finished(upload_id)
-
-               except:
-                       # If anything goes wrong, try to delete the upload and raise
-                       # the exception.
-                       self.conn.upload_remove(upload_id)
-
-                       raise
-
-               finally:
-                       if pb:
-                               pb.finish()
-
-               # If the server sends false, something happened with the upload that
-               # could not be recovered.
-               if not ret:
-                       logging.error("Upload of %s was not successful." % filename)
-                       raise Exception, "Upload failed."
-
-               return upload_id
-
-
-class PakfireUserClient(BuildMixin, PakfireClient):
-       type = "user"
-
-       def check_auth(self):
-               """
-                       Check if the user was successfully authenticated.
-               """
-               return self.conn.check_auth()
-
-       def get_user_profile(self):
-               """
-                       Get information about the user profile.
-               """
-               return self.conn.get_user_profile()
-
-       def get_builds(self, type=None, limit=10, offset=0):
-               return self.conn.get_builds(type=type, limit=limit, offset=offset)
-
-       def get_build(self, build_id):
-               return self.conn.get_build(build_id)
-
-       def get_builder(self, builder_id):
-               return self.conn.get_builder(builder_id)
-
-       def get_job(self, job_id):
-               return self.conn.get_job(job_id)
-
-       def get_latest_jobs(self):
-               return self.conn.get_latest_jobs()
-
-       def get_active_jobs(self):
-               return self.conn.get_active_jobs()
-
-
-class PakfireBuilderClient(BuildMixin, PakfireClient):
-       type = "builder"
-
-       def send_keepalive(self, force=False, overload=None, free_space=None):
-               """
-                       Sends a little keepalive to the server and
-                       updates the hardware information if the server
-                       requests it.
-               """
-               log.debug("Sending keepalive to the hub.")
-
-               # Collect the current loadavg and send it to the hub.
-               loadavg = ", ".join(("%.2f" % round(l, 2) for l in os.getloadavg()))
-
-               try:
-                       needs_update = self.conn.send_keepalive(loadavg, overload, free_space)
-
-               except XMLRPCInternalServerError:
-                       # If the keepalive message could not successfully be sent, we don't
-                       # bother, because the client will soon retry.
-                       log.warning(_("Could not send a keepalive message to the hub."))
-
-                       return
-
-               if force or needs_update:
-                       log.debug("The hub is requesting an update.")
-                       self.send_update()
-
-       def send_update(self):
-               log.info("Sending host information update to hub...")
-
-               config = pakfire.config.ConfigDaemon()
-
-               try:
-                       self.conn.send_update(
-                               # Supported architectures.
-                               system.supported_arches,
-
-                               # CPU information.
-                               system.cpu_model,
-                               system.cpu_count,
-
-                               # Amount of memory in bytes.
-                               system.memory / 1024,
-
-                               # Send the currently running version of Pakfire.
-                               PAKFIRE_VERSION,
-
-                               # Send the host key.
-                               config.get("signatures", "host_key", None),
-                       )
-
-               except XMLRPCInternalServerError:
-                       # Don't give a shit either.
-                       log.warning(_("Could not update the host information."))
-
-                       return
diff --git a/python/pakfire/client/builder.py b/python/pakfire/client/builder.py
deleted file mode 100644 (file)
index 9aefa1c..0000000
+++ /dev/null
@@ -1,497 +0,0 @@
-#!/usr/bin/python
-
-import hashlib
-import multiprocessing
-import os
-import sys
-import tempfile
-import time
-
-import pakfire.base
-import pakfire.builder
-import pakfire.config
-import pakfire.downloader
-import pakfire.system
-import pakfire.util
-from pakfire.system import system
-
-import base
-
-from pakfire.constants import *
-from pakfire.i18n import _
-
-import logging
-log = logging.getLogger("pakfire.client")
-
-def fork_builder(*args, **kwargs):
-       """
-               Wrapper that runs ClientBuilder in a new process and catches
-               any exception to report it to the main process.
-       """
-       try:
-               # Create new instance of the builder.
-               cb = ClientBuilder(*args, **kwargs)
-
-               # Run the build:
-               cb.build()
-
-       except Exception, e:
-               # XXX catch the exception and log it.
-               print e
-
-               # End the process with an exit code.
-               sys.exit(1)
-
-
-class PakfireDaemon(object):
-               """
-                       The PakfireDaemon class that creates a a new process per build
-                       job and also handles the keepalive/abort stuff.
-               """
-               def __init__(self):
-                       self.config = pakfire.config.ConfigDaemon()
-
-                       server   = self.config.get("daemon", "server")
-                       hostname = self.config.get("daemon", "hostname")
-                       secret   = self.config.get("daemon", "secret")
-
-                       self.client = base.PakfireBuilderClient(server, hostname, secret)
-                       self.conn   = self.client.conn
-
-                       # Save login data (to create child processes).
-                       self.server = server
-                       self.hostname = hostname
-                       self.__secret = secret
-
-                       # A list with all running processes.
-                       self.processes = []
-                       self.pid2jobid = {}
-
-                       # Save when last keepalive was sent.
-                       self._last_keepalive = 0
-
-                       # Send an initial keepalive message.
-                       self.send_keepalive(force=True)
-
-               def run(self, heartbeat=1, max_processes=None):
-                       # By default do not start more than two processes per CPU core.
-                       if max_processes is None:
-                               max_processes = system.cpu_count * 2
-                       log.debug("Maximum number of simultaneous processes is: %s" % max_processes)
-
-                       # Indicates when to try to request a new job or aborted builds.
-                       last_job_request = 0
-                       last_abort_request = 0
-
-                       # Main loop.
-                       while True:
-                               # Send the keepalive regularly.
-                               self.send_keepalive()
-
-                               # Remove all finished builds.
-                               # "removed" indicates, if a process has actually finished.
-                               removed = self.remove_finished_builders()
-
-                               # If a build slot was freed, search immediately for a new job.
-                               if removed:
-                                       last_job_request = 0
-
-                               # Kill aborted jobs.
-                               if time.time() - last_abort_request >= 60:
-                                       aborted = self.kill_aborted_jobs()
-
-                                       # If a build slot was freed, search immediately for a new job.
-                                       if aborted:
-                                               last_job_request = 0
-
-                                       last_abort_request = time.time()
-
-                               # Check if the maximum number of processes was reached.
-                               # Actually the hub does manage this but this is an emergency
-                               # condition if anything goes wrong.
-                               if self.num_processes >= max_processes:
-                                       log.debug("Reached maximum number of allowed processes (%s)." % max_processes)
-
-                                       time.sleep(heartbeat)
-                                       continue
-
-                               # Get new job.
-                               if time.time() - last_job_request >= 60 and not self.has_overload():
-                                       # If the last job request is older than a minute and we don't
-                                       # have too much load, we go and check if there is something
-                                       # to do for us.
-                                       job = self.get_job()
-
-                                       # If we got a job, we start a child process to work on it.
-                                       if job:
-                                               log.debug("Got a new job.")
-                                               self.fork_builder(job)
-                                       else:
-                                               log.debug("No new job.")
-
-                                       # Update the time when we requested a job.
-                                       last_job_request = time.time()
-
-                               # Wait a moment before starting over.
-                               time.sleep(heartbeat)
-
-               def shutdown(self):
-                       """
-                               Shut down the daemon.
-                               This means to kill all child processes.
-
-                               The method blocks until all processes are shut down.
-                       """
-                       for process in self.processes:
-                               log.info("Sending %s to terminate..." % process)
-
-                               process.terminate()
-                       else:
-                               log.info("No processes to kill. Shutting down immediately.")
-
-                       while self.processes:
-                               log.debug("%s process(es) is/are still running..." % len(self.processes))
-
-                               for process in self.processes[:]:
-                                       if not process.is_alive():
-                                               # The process has terminated.
-                                               log.info("Process %s terminated with exit code: %s" % \
-                                                       (process, process.exitcode))
-
-                                               self.processes.remove(process)
-
-               @property
-               def num_processes(self):
-                       # Return the number of processes.
-                       return len(self.processes)
-
-               @property
-               def free_space(self):
-                       mp = system.get_mountpoint(BUILD_ROOT)
-
-                       return mp.space_left
-
-               def get_job(self):
-                       """
-                               Get a build job from the hub.
-                       """
-                       if not self.free_space >= 2 * 1024**3:
-                               log.warning(_("Less than 2GB of free space. Cannot request a new job."))
-                               return
-
-                       log.info("Requesting a new job from the server...")
-
-                       # Get some information about this system.
-                       s = pakfire.system.System()
-
-                       # Fetch a build job from the hub.
-                       return self.client.conn.build_get_job(s.supported_arches)
-
-               def has_overload(self):
-                       """
-                               Checks, if the load average is not too high.
-
-                               On this is to be decided if a new job is taken.
-                       """
-                       try:
-                               load1, load5, load15 = os.getloadavg()
-                       except OSError:
-                               # Could not determine the current loadavg. In that case we
-                               # assume that we don't have overload.
-                               return False
-
-                       # If there are more than 2 processes in the process queue per CPU
-                       # core we will assume that the system has heavy load and to not request
-                       # a new job.
-                       return load5 >= system.cpu_count * 2
-
-               def send_keepalive(self, force=False):
-                       """
-                               When triggered, this method sends a keepalive to the hub.
-                       """
-                       # Do not send a keepalive more often than twice a minute.
-                       if time.time() - self._last_keepalive < 30:
-                               return
-
-                       kwargs = {
-                               "force"      : force,
-                               "overload"   : self.has_overload(),
-                               "free_space" : self.free_space / 1024**2,
-                       }
-
-                       self.client.send_keepalive(**kwargs)
-                       self._last_keepalive = time.time()
-
-               def remove_finished_builders(self):
-                       # Return if any processes have been removed.
-                       ret = False
-
-                       # Search for any finished processes.
-                       for process in self.processes[:]:
-                               # If the process is not alive anymore...
-                               if not process.is_alive():
-                                       ret = True
-
-                                       # ... check the exit code and log a message on errors.
-                                       if process.exitcode == 0:
-                                               log.debug("Process %s exited normally." % process)
-
-                                       elif process.exitcode > 0:
-                                               log.error("Process did not exit normally: %s code: %s" \
-                                                       % (process, process.exitcode))
-
-                                       elif process.exitcode < 0:
-                                               log.error("Process killed by signal: %s: code: %s" \
-                                                       % (process, process.exitcode))
-
-                                               # If a program has crashed, we send that to the hub.
-                                               job_id = self.pid2jobid.get(process.pid, None)
-                                               if job_id:
-                                                       self.conn.build_job_crashed(job_id, process.exitcode)
-
-                                       # Finally, remove the process from the process list.
-                                       self.processes.remove(process)
-
-                       return ret
-
-               def kill_aborted_jobs(self):
-                       log.debug("Requesting aborted jobs...")
-
-                       # Get a list of running job ids:
-                       running_jobs = self.pid2jobid.values()
-
-                       # If there are no running jobs, there is nothing to do.
-                       if not running_jobs:
-                               return False
-
-                       # Ask the hub for any build jobs to abort.
-                       aborted_jobs = self.conn.build_jobs_aborted(running_jobs)
-
-                       # If no build jobs were aborted, there is nothing to do.
-                       if not aborted_jobs:
-                               return False
-
-                       for process in self.processes[:]:
-                               job_id = self.pid2jobid.get(process.pid, None)
-                               if job_id and job_id in aborted_jobs:
-
-                                       # Kill the process.
-                                       log.info("Killing process %s which was aborted by the user." \
-                                               % process.pid)
-                                       process.terminate()
-
-                                       # Remove the process from the process list to avoid
-                                       # that is will be cleaned up in the normal way.
-                                       self.processes.remove(process)
-
-                       return True
-
-               def fork_builder(self, job):
-                       """
-                               For a new child process to create a new independent builder.
-                       """
-                       # Create the Process object.
-                       process = multiprocessing.Process(target=fork_builder,
-                               args=(self.server, self.hostname, self.__secret, job))
-                       # The process is running in daemon mode so it will try to kill
-                       # all child processes when exiting.
-                       process.daemon = True
-
-                       # Start the process.
-                       process.start()
-                       log.info("Started new process %s with PID %s." % (process, process.pid))
-
-                       # Save the PID and the build id to track down
-                       # crashed builds.
-                       self.pid2jobid[process.pid] = job.get("id", None)
-
-                       # Append it to the process list.
-                       self.processes.append(process)
-
-
-class ClientBuilder(object):
-       def __init__(self, server, hostname, secret, job):
-               self.client = base.PakfireBuilderClient(server, hostname, secret)
-               self.conn   = self.client.conn
-
-               # Store the information sent by the server here.
-               self.build_job = job
-
-       def update_state(self, state, message=None):
-               self.conn.build_job_update_state(self.build_id, state, message)
-
-       def upload_file(self, filename, type):
-               assert os.path.exists(filename)
-               assert type in ("package", "log")
-
-               # First upload the file data and save the upload_id.
-               upload_id = self.client._upload_file(filename)
-
-               # Add the file to the build.
-               return self.conn.build_job_add_file(self.build_id, upload_id, type)
-
-       def upload_buildroot(self, installed_packages):
-               pkgs = []
-
-               for pkg in installed_packages:
-                       assert pkg.uuid, "%s has got no UUID"
-                       pkgs.append((pkg.friendly_name, pkg.uuid))
-
-               return self.conn.build_upload_buildroot(self.build_id, pkgs)
-
-       @property
-       def build_id(self):
-               if self.build_job:
-                       return self.build_job.get("id", None)
-
-       @property
-       def build_arch(self):
-               if self.build_job:
-                       return self.build_job.get("arch", None)
-
-       @property
-       def build_source_url(self):
-               if self.build_job:
-                       return self.build_job.get("source_url", None)
-
-       @property
-       def build_source_filename(self):
-               if self.build_source_url:
-                       return os.path.basename(self.build_source_url)
-
-       @property
-       def build_source_hash512(self):
-               if self.build_job:
-                       return self.build_job.get("source_hash512", None)
-
-       @property
-       def build_type(self):
-               if self.build_job:
-                       return self.build_job.get("type", None)
-
-       @property
-       def build_config(self):
-               if self.build_job:
-                       return self.build_job.get("config", None)
-
-       def build(self):
-               # Cannot go on if I got no build job.
-               if not self.build_job:
-                       logging.info("No job to work on...")
-                       return
-
-               # Call the function that processes the build and try to catch general
-               # exceptions and report them to the server.
-               # If everything goes okay, we tell this the server, too.
-               try:
-                       # Create a temporary file and a directory for the resulting files.
-                       tmpdir  = tempfile.mkdtemp()
-                       tmpfile = os.path.join(tmpdir, self.build_source_filename)
-                       logfile = os.path.join(tmpdir, "build.log")
-                       cfgfile = os.path.join(tmpdir, "job-%s.conf" % self.build_id)
-
-                       # Get a package grabber and add mirror download capabilities to it.
-                       grabber = pakfire.downloader.PackageDownloader(pakfire.config.Config())
-
-                       # Create pakfire configuration instance.
-                       config = pakfire.config.ConfigDaemon()
-                       config.parse(self.build_config)
-
-                       # Create pakfire instance.
-                       p = None
-                       try:
-                               p = pakfire.base.PakfireBuilder(config=config, arch=self.build_arch)
-
-                               # Download the source package.
-                               grabber = pakfire.downloader.PackageDownloader(p)
-                               grabber.urlgrab(self.build_source_url, filename=tmpfile)
-
-                               # Check if the download checksum matches (if provided).
-                               if self.build_source_hash512:
-                                       h = hashlib.new("sha512")
-                                       f = open(tmpfile, "rb")
-                                       while True:
-                                               buf = f.read(BUFFER_SIZE)
-                                               if not buf:
-                                                       break
-
-                                               h.update(buf)
-                                       f.close()
-
-                                       if not self.build_source_hash512 == h.hexdigest():
-                                               raise DownloadError, "Hash check did not succeed."
-
-                               # Create a new instance of a build environment.
-                               build = pakfire.builder.BuildEnviron(p, tmpfile,
-                                       release_build=True, build_id=self.build_id, logfile=logfile)
-
-                               try:
-                                       # Create the build environment.
-                                       build.start()
-
-                                       # Update the build status on the server.
-                                       self.upload_buildroot(build.installed_packages)
-                                       self.update_state("running")
-
-                                       # Run the build (with install test).
-                                       build.build(install_test=True)
-
-                                       # Copy the created packages to the tempdir.
-                                       build.copy_result(tmpdir)
-
-                               finally:
-                                       # Cleanup the build environment.
-                                       build.stop()
-
-                               # Jippie, build is finished, we are going to upload the files.
-                               self.update_state("uploading")
-
-                               # Walk through the result directory and upload all (binary) files.
-                               # Skip that for test builds.
-                               if not self.build_type == "test":
-                                       for dir, subdirs, files in os.walk(tmpdir):
-                                               for file in files:
-                                                       file = os.path.join(dir, file)
-                                                       if file in (logfile, tmpfile,):
-                                                               continue
-
-                                                       self.upload_file(file, "package")
-
-                       except DependencyError, e:
-                               message = "%s: %s" % (e.__class__.__name__, e)
-                               self.update_state("dependency_error", message)
-                               raise
-
-                       except DownloadError, e:
-                               message = "%s: %s" % (e.__class__.__name__, e)
-                               self.update_state("download_error", message)
-                               raise
-
-                       finally:
-                               if p:
-                                       p.destroy()
-
-                               # Upload the logfile in any case and if it exists.
-                               if os.path.exists(logfile):
-                                       self.upload_file(logfile, "log")
-
-                               # Cleanup the files we created.
-                               pakfire.util.rm(tmpdir)
-
-               except DependencyError:
-                       # This has already been reported.
-                       raise
-
-               except (DownloadError,):
-                       # Do not take any further action for these exceptions.
-                       pass
-
-               except Exception, e:
-                       # Format the exception and send it to the server.
-                       message = "%s: %s" % (e.__class__.__name__, e)
-
-                       self.update_state("failed", message)
-                       raise
-
-               else:
-                       self.update_state("finished")
diff --git a/python/pakfire/client/transport.py b/python/pakfire/client/transport.py
deleted file mode 100644 (file)
index 319c48c..0000000
+++ /dev/null
@@ -1,158 +0,0 @@
-#!/usr/bin/python
-###############################################################################
-#                                                                             #
-# Pakfire - The IPFire package management system                              #
-# Copyright (C) 2011 Pakfire development team                                 #
-#                                                                             #
-# This program is free software: you can redistribute it and/or modify        #
-# it under the terms of the GNU General Public License as published by        #
-# the Free Software Foundation, either version 3 of the License, or           #
-# (at your option) any later version.                                         #
-#                                                                             #
-# This program is distributed in the hope that it will be useful,             #
-# but WITHOUT ANY WARRANTY; without even the implied warranty of              #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
-# GNU General Public License for more details.                                #
-#                                                                             #
-# You should have received a copy of the GNU General Public License           #
-# along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
-#                                                                             #
-###############################################################################
-
-import httplib
-import socket
-import ssl
-import time
-import xmlrpclib
-
-import logging
-log = logging.getLogger("pakfire.client")
-
-from pakfire.constants import *
-from pakfire.i18n import _
-
-# Set the default socket timeout to 30 seconds.
-socket.setdefaulttimeout(30)
-
-
-
-
-class XMLRPCMixin:
-       user_agent = "pakfire/%s" % PAKFIRE_VERSION
-
-       def single_request(self, *args, **kwargs):
-               ret = None
-
-               # Tries can be passed to this method.
-               tries = kwargs.pop("tries", 100)
-               timeout = 1
-
-               while tries:
-                       try:
-                               ret = xmlrpclib.Transport.single_request(self, *args, **kwargs)
-
-                       # Catch errors related to the connection. Just try again.
-                       except (socket.error, ssl.SSLError), e:
-                               log.warning("Exception: %s: %s" % (e.__class__.__name__, e))
-
-                       # Presumably, the server closed the connection before sending anything.
-                       except httplib.BadStatusLine:
-                               # Try again immediately.
-                               continue
-
-                       # The XML reponse could not be parsed.
-                       except xmlrpclib.ResponseError, e:
-                               log.warning("Exception: %s: %s" % (e.__class__.__name__, e))
-
-                       except xmlrpclib.ProtocolError, e:
-                               if e.errcode == 403:
-                                       # Possibly, the user credentials are invalid.
-                                       # Cannot go on.
-                                       raise XMLRPCForbiddenError(e)
-
-                               elif e.errcode == 404:
-                                       # Some invalid URL was called.
-                                       # Cannot go on.
-                                       raise XMLRPCNotFoundError(e)
-
-                               elif e.errcode == 500:
-                                       # This could have various reasons, so we can not
-                                       # be sure to kill connections here.
-                                       # But to visualize the issue, we will raise an
-                                       # exception on the last try.
-                                       if tries == 1:
-                                               raise XMLRPCInternalServerError(e)
-
-                               elif e.errcode == 503:
-                                       # Possibly the hub is not running but the SSL proxy
-                                       # is. Just try again in a short time.
-                                       pass
-
-                               else:
-                                       # Log all XMLRPC protocol errors.
-                                       log.error(_("XMLRPC protocol error:"))
-                                       log.error("  %s" % _("URL: %s") % e.url)
-                                       log.error("  %s" % _("  HTTP headers:"))
-                                       for header in e.headers.items():
-                                               log.error("    %s: %s" % header)
-                                       log.error("  %s" % _("Error code: %s") % e.errcode)
-                                       log.error("  %s" % _("Error message: %s") % e.errmsg)
-
-                                       # If an unhandled error code appeared, we raise an
-                                       # error.
-                                       raise
-
-                       except xmlrpclib.Fault:
-                               raise
-
-                       else:
-                               # If request was successful, we can break the loop.
-                               break
-
-                       # If the request was not successful, we wait a little time to try
-                       # it again.
-                       tries -= 1
-                       timeout *= 2
-                       if timeout > 60:
-                               timeout = 60
-
-                       log.warning(_("Trying again in %(timeout)s second(s). %(tries)s tries left.") \
-                               % { "timeout" : timeout, "tries" : tries })
-                       time.sleep(timeout)
-
-               else:
-                       raise XMLRPCTransportError, _("Maximum number of tries was reached. Giving up.")
-
-               return ret
-
-
-
-class XMLRPCTransport(XMLRPCMixin, xmlrpclib.Transport):
-       """
-               Handles the XMLRPC connection over HTTP.
-       """
-       pass
-
-
-class SafeXMLRPCTransport(XMLRPCMixin, xmlrpclib.SafeTransport):
-       """
-               Handles the XMLRPC connection over HTTPS.
-       """
-       pass
-
-
-class Connection(xmlrpclib.ServerProxy):
-       """
-               Class wrapper that automatically chooses the right transport
-               method depending on the given URL.
-       """
-
-       def __init__(self, url):
-               # Create transport channel to the server.
-               if url.startswith("https://"):
-                       transport = SafeXMLRPCTransport()
-               elif url.startswith("http://"):
-                       transport = XMLRPCTransport()
-
-               xmlrpclib.ServerProxy.__init__(self, url, transport=transport,
-                       allow_none=True)
index 08d9e8c1e8e83b8bb5f74316670980d842cb7118..1b30391eea09aa7865cf46d54d54d64f77114abc 100644 (file)
@@ -253,6 +253,13 @@ class ConfigClient(_Config):
                },
        }
 
+       def get_hub_credentials(self):
+               hub_url  = self.get("client", "server")
+               username = self.get("client", "username")
+               password = self.get("client", "password")
+
+               return hub_url, username, password
+
 
 class ConfigDaemon(_Config):
        files = ["general.conf", "daemon.conf"]
@@ -268,3 +275,10 @@ class ConfigDaemon(_Config):
                        "hostname" : system.hostname,
                },
        }
+
+       def get_hub_credentials(self):
+               hub_url  = self.get("daemon", "server")
+               hostname = self.get("daemon", "hostname")
+               password = self.get("daemon", "secret")
+
+               return hub_url, hostname, password
diff --git a/python/pakfire/daemon.py b/python/pakfire/daemon.py
new file mode 100644 (file)
index 0000000..d6e641d
--- /dev/null
@@ -0,0 +1,509 @@
+#!/usr/bin/python
+
+import hashlib
+import json
+import multiprocessing
+import os
+import signal
+import sys
+import tempfile
+import time
+
+import pakfire.base
+import pakfire.builder
+import pakfire.config
+import pakfire.downloader
+import pakfire.system
+import pakfire.util
+from pakfire.system import system
+
+import base
+import transport
+
+from pakfire.constants import *
+from pakfire.i18n import _
+
+import logging
+log = logging.getLogger("pakfire.daemon")
+
+class BuildJob(dict):
+       """
+               Wrapper class for build jobs, that are received from the hub.
+
+               This makes accessing attributes more easy.
+       """
+       def __getattr__(self, key):
+               try:
+                       return self[key]
+               except KeyError:
+                       raise AttributeError, key
+
+
+class PakfireDaemon(object):
+       def __init__(self, config):
+               self.config = config
+
+               # Indicates if this daemon is in running mode.
+               self.__running = True
+
+               # List of worker processes.
+               self.__workers = []
+
+               # Create connection to the hub.
+               self.transport = transport.PakfireHubTransport(self.config)
+
+               ### Configuration
+               # Number of workers in waiting state.
+               self.max_waiting = 1
+
+               # Number of running workers.
+               self.max_running = system.cpu_count * 2
+
+       def run(self, heartbeat=30):
+               """
+                       Main loop.
+               """
+               # Register signal handlers.
+               self.register_signal_handlers()
+
+               # Send our profile to the hub.
+               self.send_builder_info()
+
+               while self.__running:
+                       time_started = time.time()
+
+                       # Send keepalive message.
+                       self.send_keepalive()
+
+                       # Spawn a sufficient number of worker processes.
+                       self.spawn_workers_if_needed()
+
+                       # Get runtime of this loop iteration.
+                       time_elapsed = time.time() - time_started
+
+                       # Wait if the heartbeat time has not been reached, yet.
+                       if time_elapsed < heartbeat:
+                               time.sleep(heartbeat - time_elapsed)
+
+               # Main loop has ended, but we wait until all workers have finished.
+               self.terminate_all_workers()
+
+       def shutdown(self):
+               """
+                       Terminates all workers and exists the daemon.
+               """
+               if not self.__running:
+                       return
+
+               log.info(_("Shutting down..."))
+               self.__running = False
+
+       def spawn_workers_if_needed(self, *args, **kwargs):
+               """
+                       Spawns more workers if needed.
+               """
+               # Do not create any more processes when the daemon is shutting down.
+               if not self.__running:
+                       return
+
+               # Cleanup all other workers.
+               self.cleanup_workers()
+
+               # Do not create more workers if there are already enough workers
+               # active.
+               if len(self.workers) >= self.max_running:
+                       log.warning("More workers running than allowed")
+                       return
+
+               # Do nothing, if there is are already enough workers waiting.
+               wanted_waiting_workers = self.max_waiting - len(self.waiting_workers)
+               if wanted_waiting_workers <= 0:
+                       return
+
+               # Spawn a new worker.
+               for i in range(wanted_waiting_workers):
+                       self.spawn_worker(*args, **kwargs)
+
+       def spawn_worker(self, *args, **kwargs):
+               """
+                       Spawns a new worker process.
+               """
+               worker = PakfireWorker(config=self.config, *args, **kwargs)
+               worker.start()
+
+               log.debug("Spawned new worker process: %s" % worker)
+               self.__workers.append(worker)
+
+       def terminate_worker(self, worker):
+               """
+                       Terminates the given worker.
+               """
+               log.warning(_("Terminating worker process: %s") % worker)
+
+               worker.terminate()
+
+       def terminate_all_workers(self):
+               """
+                       Terminates all workers.
+               """
+               for worker in self.workers:
+                       self.terminate_worker(worker)
+
+                       # Wait until the worker has finished.
+                       worker.join()
+
+       def remove_worker(self, worker):
+               """
+                       Removes a worker from the internal list of worker processes.
+               """
+               assert not worker.is_alive(), "Remove alive worker?"
+
+               log.debug("Removing worker: %s" % worker)
+               try:
+                       self.__workers.remove(worker)
+               except:
+                       pass
+
+       def cleanup_workers(self):
+               """
+                       Remove workers that are not alive any more.
+               """
+               for worker in self.workers:
+                       if worker.is_alive():
+                               continue
+
+                       self.remove_worker(worker)
+
+       @property
+       def workers(self):
+               return self.__workers[:]
+
+       @property
+       def running_workers(self):
+               workers = []
+
+               for worker in self.workers:
+                       if worker.waiting.is_set():
+                               continue
+
+                       workers.append(worker)
+
+               return workers
+
+       @property
+       def waiting_workers(self):
+               workers = []
+
+               for worker in self.workers:
+                       if worker.waiting.is_set():
+                               workers.append(worker)
+
+               return workers
+
+       # Signal handling.
+
+       def register_signal_handlers(self):
+               signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
+               signal.signal(signal.SIGINT,  self.handle_SIGTERM)
+               signal.signal(signal.SIGTERM, self.handle_SIGTERM)
+
+       def handle_SIGCHLD(self, signum, frame):
+               """
+                       Handle signal SIGCHLD.
+               """
+               # Spawn new workers if necessary.
+               self.spawn_workers_if_needed()
+
+       def handle_SIGTERM(self, signum, frame):
+               """
+                       Handle signal SIGTERM.
+               """
+               # Just shut down.
+               self.shutdown()
+
+       # Talking to the hub.
+
+       def send_builder_info(self):
+               log.info(_("Sending builder information to hub..."))
+
+               data = {
+                       # CPU info
+                       "cpu_model"       : system.cpu_model,
+                       "cpu_count"       : system.cpu_count,
+                       "cpu_arch"        : system.native_arch,
+                       "cpu_bogomips"    : system.cpu_bogomips,
+
+                       # Memory + swap
+                       "mem_total"       : system.memory,
+                       "swap_total"      : system.swap_total,
+
+                       # Pakfire + OS
+                       "pakfire_version" : PAKFIRE_VERSION,
+                       "host_key"        : self.config.get("signatures", "host_key", None),
+                       "os_name"         : system.distro.pretty_name,
+
+                       # Supported arches
+                       "supported_arches" : ",".join(system.supported_arches),
+               }
+               self.transport.post("/builders/info", data=data)
+
+       def send_keepalive(self):
+               log.debug("Sending keepalive message to hub...")
+
+               data = {
+                       # Load average.
+                       "loadavg1"   : system.loadavg1,
+                       "loadavg5"   : system.loadavg5,
+                       "loadavg15"  : system.loadavg15,
+
+                       # Memory
+                       "mem_total"  : system.memory_total,
+                       "mem_free"   : system.memory_free,
+
+                       # Swap
+                       "swap_total" : system.swap_total,
+                       "swap_free"  : system.swap_free,
+
+                       # Disk space
+                       "space_free" : self.free_space,
+               }
+               self.transport.post("/builders/keepalive", data=data)
+
+       @property
+       def free_space(self):
+               mp = system.get_mountpoint(BUILD_ROOT)
+
+               return mp.space_left
+
+
+class PakfireWorker(multiprocessing.Process):
+       def __init__(self, config, waiting=None):
+               multiprocessing.Process.__init__(self)
+
+               # Save config.
+               self.config = config
+
+               # Waiting event. Clear if this worker is running a build.
+               self.waiting = multiprocessing.Event()
+               self.waiting.set()
+
+               # Indicates if this worker is running.
+               self.__running = True
+
+       def run(self):
+               # Register signal handlers.
+               self.register_signal_handlers()
+
+               # Create connection to the hub.
+               self.transport = transport.PakfireHubTransport(self.config)
+               self.transport.fork()
+
+               while self.__running:
+                       # Try to get a new build job.
+                       job = self.get_new_build_job()
+                       if not job:
+                               continue
+
+                       # If we got a job, we are not waiting anymore.
+                       self.waiting.clear()
+
+                       # Run the job and return.
+                       return self.execute_job(job)
+
+       def shutdown(self):
+               self.__running = False
+
+               # When we are just waiting, we can edit right away.
+               if self.waiting.is_set():
+                       log.debug("Exiting immediately")
+                       sys.exit(1)
+
+               # XXX figure out what to do, when a build is running.
+
+       # Signal handling.
+
+       def register_signal_handlers(self):
+               signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
+               signal.signal(signal.SIGINT,  self.handle_SIGTERM)
+               signal.signal(signal.SIGTERM, self.handle_SIGTERM)
+
+       def handle_SIGCHLD(self, signum, frame):
+               """
+                       Handle signal SIGCHLD.
+               """
+               # Must be here so that SIGCHLD won't be propagated to
+               # PakfireDaemon.
+               pass
+
+       def handle_SIGTERM(self, signum, frame):
+               """
+                       Handle signal SIGTERM.
+               """
+               self.shutdown()
+
+       def get_new_build_job(self, timeout=600):
+               log.debug("Requesting new job...")
+
+               try:
+                       job = self.transport.get_json("/builders/jobs/queue",
+                               data={ "timeout" : timeout, }, timeout=timeout)
+
+                       if job:
+                               return BuildJob(job)
+
+               # As this is a long poll request, it is okay to time out.
+               except TransportMaxTriesExceededError:
+                       pass
+
+       def execute_job(self, job):
+               log.debug("Executing job: %s" % job)
+
+               # Call the function that processes the build and try to catch general
+               # exceptions and report them to the server.
+               # If everything goes okay, we tell this the server, too.
+               try:
+                       # Create a temporary file and a directory for the resulting files.
+                       tmpdir  = tempfile.mkdtemp()
+                       tmpfile = os.path.join(tmpdir, os.path.basename(job.source_url))
+                       logfile = os.path.join(tmpdir, "build.log")
+
+                       # Create pakfire configuration instance.
+                       config = pakfire.config.ConfigDaemon()
+                       config.parse(job.config)
+
+                       # Create pakfire instance.
+                       p = None
+                       try:
+                               p = pakfire.base.PakfireBuilder(config=config, arch=job.arch)
+
+                               # Download the source package.
+                               grabber = pakfire.downloader.PackageDownloader(p)
+                               grabber.urlgrab(job.source_url, filename=tmpfile)
+
+                               # Check if the download checksum matches (if provided).
+                               if job.source_hash_sha512:
+                                       h = hashlib.new("sha512")
+                                       f = open(tmpfile, "rb")
+                                       while True:
+                                               buf = f.read(BUFFER_SIZE)
+                                               if not buf:
+                                                       break
+
+                                               h.update(buf)
+                                       f.close()
+
+                                       if not job.source_hash_sha512 == h.hexdigest():
+                                               raise DownloadError, "Hash check did not succeed."
+
+                               # Create a new instance of a build environment.
+                               build = pakfire.builder.BuildEnviron(p, tmpfile,
+                                       release_build=True, build_id=job.id, logfile=logfile)
+
+                               try:
+                                       # Create the build environment.
+                                       build.start()
+
+                                       # Update the build status on the server.
+                                       self.upload_buildroot(job, build.installed_packages)
+                                       self.update_state(job, "running")
+
+                                       # Run the build (without install test).
+                                       build.build(install_test=False)
+
+                                       # Copy the created packages to the tempdir.
+                                       build.copy_result(tmpdir)
+
+                               finally:
+                                       # Cleanup the build environment.
+                                       build.stop()
+
+                               # Jippie, build is finished, we are going to upload the files.
+                               self.update_state(job, "uploading")
+
+                               # Walk through the result directory and upload all (binary) files.
+                               # Skip that for test builds.
+                               if not job.type == "test":
+                                       for dir, subdirs, files in os.walk(tmpdir):
+                                               for file in files:
+                                                       file = os.path.join(dir, file)
+                                                       if file in (logfile, tmpfile,):
+                                                               continue
+
+                                                       self.upload_file(job, file, "package")
+
+                       except DependencyError, e:
+                               message = "%s: %s" % (e.__class__.__name__, e)
+                               self.update_state(job, "dependency_error", message)
+                               raise
+
+                       except DownloadError, e:
+                               message = "%s: %s" % (e.__class__.__name__, e)
+                               self.update_state(job, "download_error", message)
+                               raise
+
+                       finally:
+                               if p:
+                                       p.destroy()
+
+                               # Upload the logfile in any case and if it exists.
+                               if os.path.exists(logfile):
+                                       self.upload_file(job, logfile, "log")
+
+                               # Cleanup the files we created.
+                               pakfire.util.rm(tmpdir)
+
+               except DependencyError:
+                       # This has already been reported.
+                       raise
+
+               except (DownloadError,):
+                       # Do not take any further action for these exceptions.
+                       pass
+
+               except (KeyboardInterrupt, SystemExit):
+                       self.update_state(job, "aborted")
+
+               except Exception, e:
+                       # Format the exception and send it to the server.
+                       message = "%s: %s" % (e.__class__.__name__, e)
+
+                       self.update_state(job, "failed", message)
+                       raise
+
+               else:
+                       self.update_state(job, "finished")
+
+       def update_state(self, job, state, message=None):
+               """
+                       Update state of the build job on the hub.
+               """
+               data = {
+                       "message" : message or "",
+               }
+
+               self.transport.post("/builders/jobs/%s/state/%s" % (job.id, state),
+                       data=data)
+
+       def upload_file(self, job, filename, type):
+               assert os.path.exists(filename)
+               assert type in ("package", "log")
+
+               # First upload the file data and save the upload_id.
+               upload_id = self.transport.upload_file(filename)
+
+               data = {
+                       "type" : type,
+               }
+
+               # Add the file to the build.
+               self.transport.post("/builders/jobs/%s/addfile/%s" % (job.id, upload_id),
+                       data=data)
+
+       def upload_buildroot(self, job, installed_packages):
+               pkgs = []
+               for pkg in installed_packages:
+                       pkgs.append((pkg.friendly_name, pkg.uuid))
+
+               data = { "buildroot" : json.dumps(pkgs) }
+
+               self.transport.post("/builders/jobs/%s/buildroot" % job.id, data=data)
index d4cc6e7e451bd5e7ceffabad210d51feb3e34058..fae20bb36987c8a170d0f6be3b586c98c0730d3a 100644 (file)
@@ -25,7 +25,7 @@ import re
 import logging
 log = logging.getLogger("pakfire")
 
-from system import system
+import system
 
 class Distribution(object):
        def __init__(self,  data=None):
@@ -47,8 +47,9 @@ class Distribution(object):
                        return
 
                keymap = {
-                       "NAME"       : "name",
-                       "VERSION_ID" : "release",
+                       "NAME"        : "name",
+                       "PRETTY_NAME" : "pretty_name",
+                       "VERSION_ID"  : "release",
                }
 
                data = {}
@@ -109,6 +110,14 @@ class Distribution(object):
        def name(self):
                return self._data.get("name", "unknown")
 
+       @property
+       def pretty_name(self):
+               pretty_name = self._data.get("pretty_name", None)
+               if not pretty_name:
+                       pretty_name = " ".join((self.name, self.release))
+
+               return pretty_name
+
        @property
        def release(self):
                return self._data.get("release", "0")
@@ -134,11 +143,11 @@ class Distribution(object):
                return self._data.get("contact", "N/A")
 
        def get_arch(self):
-               arch = self._data.get("arch", None) or system.arch
+               arch = self._data.get("arch", None) or system.system.arch
 
                # We can not set up a build environment for noarch.
                if arch == "noarch":
-                       arch = system.arch
+                       arch = system.system.arch
 
                return arch
        
@@ -228,7 +237,7 @@ class Distribution(object):
                        None to skip the setting of the personality in the build chroot.
                """
 
-               if self.arch == system.native_arch:
+               if self.arch == system.system.native_arch:
                        return None
 
                arch2personality = {
index 2315fca0b423bb1631420fef41386fd9f003e720..d9808597d3ae288ab18650cc4c7ea971e3760747 100644 (file)
@@ -21,6 +21,7 @@
 
 import json
 import os
+import pycurl
 import random
 
 import logging
@@ -28,6 +29,7 @@ log = logging.getLogger("pakfire")
 
 from config import _Config
 
+import urlgrabber.grabber
 from urlgrabber.grabber import URLGrabber, URLGrabError
 from urlgrabber.mirror import MirrorGroup
 from urlgrabber.progress import TextMeter
@@ -72,6 +74,17 @@ class PakfireGrabber(URLGrabber):
 
                URLGrabber.__init__(self, *args, **kwargs)
 
+       def fork(self):
+               """
+                       Reset Curl object after forking a process.
+               """
+               # XXX this is a very ugly hack and fiddles around with the internals
+               # or urlgrabber. We should not touch these, but apparently nobody
+               # else uses multiple threads or processes to talk to their servers.
+               # So we simply replace Curl with a new instance without closing
+               # the old one. This should be fixed in urlgrabber and/or pycurl.
+               urlgrabber.grabber._curl_cache = pycurl.Curl()
+
        def check_offline_mode(self):
                offline = self.config.get("downloader", "offline")
                if not offline:
index 3ac7b3c1f1ff7ee334af06248b258e84e8519754..87c64e33a03a717347e6d95a5285583238fba2e5 100644 (file)
@@ -98,6 +98,62 @@ class TransactionCheckError(Error):
        message = _("Transaction test was not successful")
 
 
+class TransportError(Error):
+       pass
+
+
+class TransportConnectionError(TransportError):
+       pass
+
+
+class TransportConnectionDNSError(TransportConnectionError):
+       pass
+
+
+class TransportConnectionProxyError(TransportConnectionError):
+       pass
+
+
+class TransportConnectionReadError(TransportConnectionError):
+       pass
+
+
+class TransportConnectionResetError(TransportConnectionError):
+       pass
+
+
+class TransportConnectionTimeoutError(TransportConnectionError):
+       pass
+
+
+class TransportConnectionWriteError(TransportConnectionError):
+       pass
+
+
+class TransportSSLError(TransportConnectionError):
+       pass
+
+
+class TransportSSLCertificateExpiredError(TransportSSLError):
+       pass
+
+
+class TransportInternalServerError(TransportError):
+       pass
+
+
+class TransportForbiddenError(TransportError):
+       pass
+
+
+class TransportMaxTriesExceededError(TransportError):
+       pass
+
+
+class TransportNotFoundError(TransportError):
+       pass
+
+
 class XMLRPCError(Error):
        message = _("Generic XMLRPC error.")
 
index 6d54d5ffa170bd40e7fc02475076c4ad724e1c8c..9a5d546aa39fe90d921422bcf5f67813e3ad2fa5 100644 (file)
@@ -25,6 +25,8 @@ import multiprocessing
 import os
 import socket
 
+import distro
+
 from i18n import _
 
 class System(object):
@@ -42,6 +44,13 @@ class System(object):
 
                return hn
 
+       @property
+       def distro(self):
+               if not hasattr(self, "_distro"):
+                       self._distro = distro.Distribution()
+
+               return self._distro
+
        @property
        def native_arch(self):
                """
@@ -98,24 +107,32 @@ class System(object):
                """
                return multiprocessing.cpu_count()
 
-       @property
-       def cpu_model(self):
-               # Determine CPU model
-               cpuinfo = {}
+       def parse_cpuinfo(self):
+               ret = {}
+
                with open("/proc/cpuinfo") as f:
                        for line in f.readlines():
-                               # Break at an empty line, because all information after that
-                               # is redundant.
-                               if not line:
+                               # Only parse the first block.
+                               if line == "\n":
                                        break
 
                                try:
-                                       key, value = line.split(":")
+                                       # Split the lines by colons.
+                                       a, b = line.split(":")
+
+                                       # Strip whitespace.
+                                       a = a.strip()
+                                       b = b.strip()
+
+                                       ret[a] = b
                                except:
-                                       pass # Skip invalid lines
+                                       pass
 
-                               key, value = key.strip(), value.strip()
-                               cpuinfo[key] = value
+               return ret
+
+       @property
+       def cpu_model(self):
+               cpuinfo = self.parse_cpuinfo()
 
                ret = None
                if self.arch.startswith("arm"):
@@ -132,20 +149,85 @@ class System(object):
                return ret or _("Could not be determined")
 
        @property
-       def memory(self):
-               # Determine memory size
-               memory = 0
+       def cpu_bogomips(self):
+               cpuinfo = self.parse_cpuinfo()
+
+               bogomips = cpuinfo.get("bogomips", None)
+               try:
+                       return float(bogomips) * self.cpu_count
+               except:
+                       pass
+
+       def get_loadavg(self):
+               return os.getloadavg()
+
+       @property
+       def loadavg1(self):
+               return self.get_loadavg()[0]
+
+       @property
+       def loadavg5(self):
+               return self.get_loadavg()[1]
+
+       @property
+       def loadavg15(self):
+               return self.get_loadavg()[2]
+
+       def has_overload(self):
+               """
+                       Checks, if the load average is not too high.
+
+                       On this is to be decided if a new job is taken.
+               """
+               # If there are more than 2 processes in the process queue per CPU
+               # core we will assume that the system has heavy load and to not request
+               # a new job.
+               return self.loadavg5 >= self.cpu_count * 2
+
+       def parse_meminfo(self):
+               ret = {}
+
                with open("/proc/meminfo") as f:
-                       line = f.readline()
+                       for line in f.readlines():
+                               try:
+                                       a, b, c = line.split()
 
-                       try:
-                               a, b, c = line.split()
-                       except:
-                               pass
-                       else:
-                               memory = int(b) * 1024
+                                       a = a.strip()
+                                       a = a.replace(":", "")
+                                       b = int(b)
+
+                                       ret[a] = b * 1024
+                               except:
+                                       pass
+
+               return ret
+
+       @property
+       def memory_total(self):
+               meminfo = self.parse_meminfo()
+
+               return meminfo.get("MemTotal", None)
+
+       # For compatibility
+       memory = memory_total
+
+       @property
+       def memory_free(self):
+               meminfo = self.parse_meminfo()
+
+               return meminfo.get("MemFree", None)
+
+       @property
+       def swap_total(self):
+               meminfo = self.parse_meminfo()
+
+               return meminfo.get("SwapTotal", None)
+
+       @property
+       def swap_free(self):
+               meminfo = self.parse_meminfo()
 
-               return memory
+               return meminfo.get("SwapFree", None)
 
        def get_mountpoint(self, path):
                return Mountpoint(path)
diff --git a/python/pakfire/transport.py b/python/pakfire/transport.py
new file mode 100644 (file)
index 0000000..db760ec
--- /dev/null
@@ -0,0 +1,387 @@
+#!/usr/bin/python
+###############################################################################
+#                                                                             #
+# Pakfire - The IPFire package management system                              #
+# Copyright (C) 2013 Pakfire development team                                 #
+#                                                                             #
+# This program is free software: you can redistribute it and/or modify        #
+# it under the terms of the GNU General Public License as published by        #
+# the Free Software Foundation, either version 3 of the License, or           #
+# (at your option) any later version.                                         #
+#                                                                             #
+# This program is distributed in the hope that it will be useful,             #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of              #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
+# GNU General Public License for more details.                                #
+#                                                                             #
+# You should have received a copy of the GNU General Public License           #
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
+#                                                                             #
+###############################################################################
+
+from __future__ import division
+
+import base64
+import hashlib
+import json
+import os
+import time
+import urlgrabber
+import urllib
+import urlparse
+
+import pakfire.downloader
+import pakfire.util
+
+from pakfire.constants import *
+from pakfire.i18n import _
+
+import logging
+log = logging.getLogger("pakfire.transport")
+
+
+class PakfireHubTransportUploader(object):
+       """
+               Handles the upload of a single file to the hub.
+       """
+
+       def __init__(self, transport, filename):
+               self.transport = transport
+               self.filename = filename
+
+       def get_upload_id(self):
+               """
+                       Gets an upload from the pakfire hub.
+               """
+               # Calculate the SHA1 sum of the file to upload.
+               h = hashlib.new("sha1")
+               with open(self.filename, "rb") as f:
+                       while True:
+                               buf = f.read(CHUNK_SIZE)
+                               if not buf:
+                                       break
+
+                               h.update(buf)
+
+               data = {
+                       "filename" : os.path.basename(self.filename),
+                       "filesize" : os.path.getsize(self.filename),
+                       "hash"     : h.hexdigest(),
+               }
+
+               upload_id = self.transport.get("/uploads/create", data=data)
+               log.debug("Got upload id: %s" % upload_id)
+
+               return upload_id
+
+       def send_file(self, upload_id, progress_callback=None):
+               """
+                       Sends the file content to the server.
+
+                       The data is splitted into chunks, which are
+                       sent one after an other.
+               """
+               with open(self.filename, "rb") as f:
+                       # Initial chunk size.
+                       chunk_size = CHUNK_SIZE
+
+                       # Count the already transmitted bytes.
+                       transferred = 0
+
+                       while True:
+                               chunk = f.read(chunk_size)
+                               if not chunk:
+                                       break
+
+                               log.debug("Got chunk of %s bytes" % len(chunk))
+
+                               # Save the time when we started to send this bit.
+                               time_started = time.time()
+
+                               # Send the chunk to the server.
+                               self.send_chunk(upload_id, chunk)
+
+                               # Save the duration.time after the chunk has been transmitted
+                               # and adjust chunk size to send one chunk per second.
+                               duration = time.time() - time_started
+                               chunk_size = int(chunk_size / duration)
+
+                               # Never let chunk_size drop under CHUNK_SIZE:
+                               if chunk_size < CHUNK_SIZE:
+                                       chunk_size = CHUNK_SIZE
+
+                               # Add up the send amount of data.
+                               transferred += len(chunk)
+                               if progress_callback:
+                                       progress_callback(transferred)
+
+       def send_chunk(self, upload_id, data):
+               """
+                       Sends a piece of the file to the server.
+               """
+               # Calculate checksum over the chunk data.
+               h = hashlib.new("sha512")
+               h.update(data)
+               chksum = h.hexdigest()
+
+               # Encode data in base64.
+               data = base64.b64encode(data)
+
+               # Send chunk data to the server.
+               self.transport.post("/uploads/%s/sendchunk" % upload_id,
+                       data={ "chksum" : chksum, "data" : data })
+
+       def destroy_upload(self, upload_id):
+               """
+                       Destroys the upload on the server.
+               """
+               self.transport.get("/uploads/%s/destroy" % upload_id)
+
+       def finish_upload(self, upload_id):
+               """
+                       Signals to the server, that the upload has finished.
+               """
+               self.transport.get("/uploads/%s/finished" % upload_id)
+
+       def run(self):
+               upload_id = None
+
+               # Create a progress bar.
+               progress = pakfire.util.make_progress(
+                       os.path.basename(self.filename), os.path.getsize(self.filename), speed=True, eta=True,
+               )
+
+               try:
+                       # Get an upload ID.
+                       upload_id = self.get_upload_id()
+
+                       # Send the file content.
+                       self.send_file(upload_id, progress_callback=progress.update)
+
+               except:
+                       progress.finish()
+
+                       # Remove broken upload from server.
+                       if upload_id:
+                               self.destroy_upload(upload_id)
+
+                       # XXX catch fatal errors
+                       raise
+
+               else:
+                       progress.finish()
+
+                       # If no exception was raised, the upload
+                       # has finished.
+                       self.finish_upload(upload_id)
+
+               # Return the upload id so some code can actually do something
+               # with the file on the server.
+               return upload_id
+
+
+class PakfireHubTransport(object):
+       """
+               Connection to the pakfire hub.
+       """
+
+       def __init__(self, config):
+               self.config = config
+
+               # Create connection to the hub.
+               self.grabber = pakfire.downloader.PakfireGrabber(
+                       self.config, prefix=self.url,
+               )
+
+       def fork(self):
+               return self.grabber.fork()
+
+       @property
+       def url(self):
+               """
+                       Construct a right URL out of the given
+                       server, username and password.
+
+                       Basicly this just adds the credentials
+                       to the URL.
+               """
+               # Get credentials.
+               server, username, password = self.config.get_hub_credentials()
+
+               # Parse the given URL.
+               url = urlparse.urlparse(server)
+               assert url.scheme in ("http", "https")
+
+               # Build new URL.
+               ret = "%s://" % url.scheme
+
+               # Add credentials if provided.
+               if username and password:
+                       ret += "%s:%s@" % (username, password)
+
+               # Add path components.
+               ret += url.netloc
+
+               return ret
+
+       def one_request(self, url, **kwargs):
+               try:
+                       return self.grabber.urlread(url, **kwargs)
+
+               except urlgrabber.grabber.URLGrabError, e:
+                       # Timeout
+                       if e.errno == 12:
+                               raise TransportConnectionTimeoutError, e
+
+                       # Handle common HTTP errors
+                       elif e.errno == 14:
+                               # Connection errors
+                               if e.code == 5:
+                                       raise TransportConnectionProxyError, url
+                               elif e.code == 6:
+                                       raise TransportConnectionDNSError, url
+                               elif e.code == 7:
+                                       raise TransportConnectionResetError, url
+                               elif e.code == 23:
+                                       raise TransportConnectionWriteError, url
+                               elif e.code == 26:
+                                       raise TransportConnectionReadError, url
+
+                               # SSL errors
+                               elif e.code == 52:
+                                       raise TransportSSLCertificateExpiredError, url
+
+                               # HTTP error codes
+                               elif e.code == 403:
+                                       raise TransportForbiddenError, url
+                               elif e.code == 404:
+                                       raise TransportNotFoundError, url
+                               elif e.code == 500:
+                                       raise TransportInternalServerError, url
+
+                       # All other exceptions...
+                       raise
+
+       def request(self, url, tries=None, **kwargs):
+               # tries = None implies wait infinitely
+
+               while tries or tries is None:
+                       if tries:
+                               tries -= 1
+
+                       try:
+                               return self.one_request(url, **kwargs)
+
+                       # 500 - Internal Server Error
+                       except TransportInternalServerError, e:
+                               log.exception("%s" % e.__class__.__name__)
+
+                               # Wait a minute before trying again.
+                               time.sleep(60)
+
+                       # Retry on connection problems.
+                       except TransportConnectionError, e:
+                               log.exception("%s" % e.__class__.__name__)
+
+                               # Wait for 10 seconds.
+                               time.sleep(10)
+
+               raise TransportMaxTriesExceededError
+
+       def escape_args(self, **kwargs):
+               return urllib.urlencode(kwargs)
+
+       def get(self, url, data={}, **kwargs):
+               """
+                       Sends a HTTP GET request to the given URL.
+
+                       All given keyword arguments are considered as form data.
+               """
+               params = self.escape_args(**data)
+
+               if params:
+                       url = "%s?%s" % (url, params)
+
+               return self.request(url, **kwargs)
+
+       def post(self, url, data={}, **kwargs):
+               """
+                       Sends a HTTP POST request to the given URL.
+
+                       All keyword arguments are considered as form data.
+               """
+               params = self.escape_args(**data)
+               if params:
+                       kwargs.update({
+                               "data" : params,
+                       })
+
+               return self.request(url, **kwargs)
+
+       def upload_file(self, filename):
+               """
+                       Uploads the given file to the server.
+               """
+               uploader = PakfireHubTransportUploader(self, filename)
+               upload_id = uploader.run()
+
+               return upload_id
+
+       def get_json(self, *args, **kwargs):
+               res = self.get(*args, **kwargs)
+
+               # Decode JSON.
+               if res:
+                       return json.loads(res)
+
+       ### Misc. actions
+
+       def noop(self):
+               """
+                       No operation. Just to check if the connection is
+                       working. Returns a random number.
+               """
+               return self.get("/noop")
+
+       def test_code(self, error_code):
+               assert error_code >= 100 and error_code <= 999
+
+               self.get("/error/test/%s" % error_code)
+
+       # Build actions
+
+       def build_create(self, filename, build_type, arches=None, distro=None):
+               """
+                       Create a new build on the hub.
+               """
+               assert build_type in ("scratch", "release")
+
+               # XXX Check for permission to actually create a build.
+
+               # Upload the source file to the server.
+               upload_id = self.upload_file(filename)
+
+               data = {
+                       "arches"     : ",".join(arches or []),
+                       "build_type" : build_type,
+                       "distro"     : distro or "",
+                       "upload_id"  : upload_id,
+               }
+
+               # Then create the build.
+               build_id = self.get("/builds/create", data=data)
+
+               return build_id or None
+
+       def build_get(self, build_uuid):
+               return self.get_json("/builds/%s" % build_uuid)
+
+       # Job actions
+
+       def job_get(self, job_uuid):
+               return self.get_json("/jobs/%s" % job_uuid)
+
+       # Package actions
+
+       def package_get(self, package_uuid):
+               return self.get_json("/packages/%s" % package_uuid)