Build some sort of preforked worker model like Apache does.
The XMLRPC backend has been replaced by a proper API for
which urlgrabber is used.
PYTHON_CC = $(CC) -pthread -fPIC
PYTHON_CFLAGS = $(shell python-config --cflags)
-PYTHON_MODULES = pakfire pakfire/client pakfire/packages pakfire/repository
+PYTHON_MODULES = pakfire pakfire/packages pakfire/repository
ifeq "$(DEBIAN)" "1"
PYTHON_DIR = $(LIBDIR)/python$(PYTHON_VERSION)/dist-packages
else
import base
import client
import config
+import daemon
import logger
import packages
import repository
"test" : self.handle_test,
}
- # Read configuration for the pakfire client.
+ # Read configuration.
self.config = config.ConfigClient()
# Create connection to pakfire hub.
- self.client = client.PakfireUserClient(
- self.config.get("client", "server"),
- self.config.get("client", "username"),
- self.config.get("client", "password"),
- )
+ self.client = client.PakfireClient(self.config)
@property
def pakfire_args(self):
# Format arches.
if self.args.arch:
- arches = self.args.arch.replace(",", " ")
+ arches = self.args.arch.split(",")
else:
arches = None
# Create a new build on the server.
- build = self.client.build_create(package, arches=arches)
-
- # XXX Print the resulting build.
- print build
+ build_id = self.client.build_create(package, build_type="scratch",
+ arches=arches)
finally:
# Cleanup the temporary directory and all files.
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir, ignore_errors=True)
+ # Monitor the build.
+ if build_id:
+ self.watch_build(build_id)
+
def handle_info(self):
ret = []
res = self.client.test_code(error_code)
print _("Reponse from the server: %s") % res
+ def watch_build(self, build_id):
+ print self.client.build_get(build_id)
+ # XXX TODO
+ print build_id
+
class CliDaemon(Cli):
def __init__(self):
Runs the pakfire daemon with provided settings.
"""
# Read the configuration file for the daemon.
- conf = config.ConfigDaemon()
+ self.config = config.ConfigDaemon()
+ logger.setup_logging(self.config)
# Create daemon instance.
- d = client.PakfireDaemon()
+ d = daemon.PakfireDaemon(self.config)
try:
d.run()
--- /dev/null
+#!/usr/bin/python
+###############################################################################
+# #
+# Pakfire - The IPFire package management system #
+# Copyright (C) 2013 Pakfire development team #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <http://www.gnu.org/licenses/>. #
+# #
+###############################################################################
+
+import transport
+
+from pakfire.constants import *
+from pakfire.i18n import _
+
+import logging
+log = logging.getLogger("pakfire.client")
+
+class PakfireClient(object):
+ def __init__(self, config):
+ self.config = config
+
+ # Create connection to the hub.
+ self.transport = transport.PakfireHubTransport(self.config)
+
+ def build_create(self, *args, **kwargs):
+ return self.transport.build_create(*args, **kwargs)
+
+ def build_get(self, *args, **kwargs):
+ return self.transport.build_get(*args, **kwargs)
+
+ def job_get(self, *args, **kwargs):
+ return self.transport.job_get(*args, **kwargs)
+
+ def package_get(self, *args, **kwargs):
+ return self.transport.package_get(*args, **kwargs)
+
+# XXX OLD CODE
+
+class PakfireUserClient(PakfireClient):
+ type = "user"
+
+ def check_auth(self):
+ """
+ Check if the user was successfully authenticated.
+ """
+ return self.conn.check_auth()
+
+ def get_user_profile(self):
+ """
+ Get information about the user profile.
+ """
+ return self.conn.get_user_profile()
+
+ def get_builds(self, type=None, limit=10, offset=0):
+ return self.conn.get_builds(type=type, limit=limit, offset=offset)
+
+ def get_build(self, build_id):
+ return self.conn.get_build(build_id)
+
+ def get_builder(self, builder_id):
+ return self.conn.get_builder(builder_id)
+
+ def get_job(self, job_id):
+ return self.conn.get_job(job_id)
+
+ def get_latest_jobs(self):
+ return self.conn.get_latest_jobs()
+
+ def get_active_jobs(self):
+ return self.conn.get_active_jobs()
+
+++ /dev/null
-#!/usr/bin/python
-
-from base import PakfireUserClient, PakfireBuilderClient
-from builder import PakfireDaemon, ClientBuilder
+++ /dev/null
-#!/usr/bin/python
-
-from __future__ import division
-
-import os
-import socket
-import urlparse
-import xmlrpclib
-
-import pakfire.util
-import pakfire.packages as packages
-from pakfire.system import system
-
-# Local modules.
-import transport
-
-from pakfire.constants import *
-from pakfire.i18n import _
-
-import logging
-log = logging.getLogger("pakfire.client")
-
-class PakfireClient(object):
- type = None
-
- def __init__(self, server, username, password):
- self.url = self._join_url(server, username, password)
-
- # Create a secure XMLRPC connection to the server.
- self.conn = transport.Connection(self.url)
-
- def _join_url(self, server, username, password):
- """
- Construct a right URL out of the given
- server, username and password.
-
- Basicly this just adds the credentials
- to the URL.
- """
- assert self.type
-
- # Parse the given URL.
- url = urlparse.urlparse(server)
- assert url.scheme in ("http", "https")
-
- # Build new URL.
- ret = "%s://" % url.scheme
-
- # Add credentials if provided.
- if username and password:
- ret += "%s:%s@" % (username, password)
-
- # Add host and path components.
- ret += "/".join((url.netloc, self.type))
-
- return ret
-
- ### Misc. actions
-
- def noop(self):
- """
- No operation. Just to check if the connection is
- working. Returns a random number.
- """
- return self.conn.noop()
-
- def test_code(self, error_code):
- assert error_code >= 100 and error_code <= 999
-
- return self.conn.test_code(error_code)
-
- def get_my_address(self):
- """
- Get my own address (as seen by the hub).
- """
- return self.conn.get_my_address()
-
- def get_hub_status(self):
- """
- Get some status information about the hub.
- """
- return self.conn.get_hub_status()
-
-
-class BuildMixin(object):
- ### Build actions
-
- def build_create(self, filename, arches=None, distro=None):
- """
- Create a new build on the hub.
- """
-
- # Upload the source file to the server.
- upload_id = self._upload_file(filename)
-
- # Then create the build.
- build = self.conn.build_create(upload_id, distro, arches)
-
- print build
-
- def _upload_file(self, filename):
- # Get the hash of the file.
- hash = pakfire.util.calc_hash1(filename)
-
- # Get the size of the file.
- size = os.path.getsize(filename)
-
- # Get an upload ID from the server.
- upload_id = self.conn.upload_create(os.path.basename(filename),
- size, hash)
-
- # Make a nice progressbar.
- pb = pakfire.util.make_progress(os.path.basename(filename), size, speed=True, eta=True)
-
- try:
- # Calculate the number of chunks.
- chunks = (size // CHUNK_SIZE) + 1
- transferred = 0
-
- # Cut the file in pieces and upload them one after another.
- with open(filename) as f:
- chunk = 0
- while True:
- data = f.read(CHUNK_SIZE)
- if not data:
- break
-
- chunk += 1
- if pb:
- transferred += len(data)
- pb.update(transferred)
-
- data = xmlrpclib.Binary(data)
- self.conn.upload_chunk(upload_id, data)
-
- # Tell the server, that we finished the upload.
- ret = self.conn.upload_finished(upload_id)
-
- except:
- # If anything goes wrong, try to delete the upload and raise
- # the exception.
- self.conn.upload_remove(upload_id)
-
- raise
-
- finally:
- if pb:
- pb.finish()
-
- # If the server sends false, something happened with the upload that
- # could not be recovered.
- if not ret:
- logging.error("Upload of %s was not successful." % filename)
- raise Exception, "Upload failed."
-
- return upload_id
-
-
-class PakfireUserClient(BuildMixin, PakfireClient):
- type = "user"
-
- def check_auth(self):
- """
- Check if the user was successfully authenticated.
- """
- return self.conn.check_auth()
-
- def get_user_profile(self):
- """
- Get information about the user profile.
- """
- return self.conn.get_user_profile()
-
- def get_builds(self, type=None, limit=10, offset=0):
- return self.conn.get_builds(type=type, limit=limit, offset=offset)
-
- def get_build(self, build_id):
- return self.conn.get_build(build_id)
-
- def get_builder(self, builder_id):
- return self.conn.get_builder(builder_id)
-
- def get_job(self, job_id):
- return self.conn.get_job(job_id)
-
- def get_latest_jobs(self):
- return self.conn.get_latest_jobs()
-
- def get_active_jobs(self):
- return self.conn.get_active_jobs()
-
-
-class PakfireBuilderClient(BuildMixin, PakfireClient):
- type = "builder"
-
- def send_keepalive(self, force=False, overload=None, free_space=None):
- """
- Sends a little keepalive to the server and
- updates the hardware information if the server
- requests it.
- """
- log.debug("Sending keepalive to the hub.")
-
- # Collect the current loadavg and send it to the hub.
- loadavg = ", ".join(("%.2f" % round(l, 2) for l in os.getloadavg()))
-
- try:
- needs_update = self.conn.send_keepalive(loadavg, overload, free_space)
-
- except XMLRPCInternalServerError:
- # If the keepalive message could not successfully be sent, we don't
- # bother, because the client will soon retry.
- log.warning(_("Could not send a keepalive message to the hub."))
-
- return
-
- if force or needs_update:
- log.debug("The hub is requesting an update.")
- self.send_update()
-
- def send_update(self):
- log.info("Sending host information update to hub...")
-
- config = pakfire.config.ConfigDaemon()
-
- try:
- self.conn.send_update(
- # Supported architectures.
- system.supported_arches,
-
- # CPU information.
- system.cpu_model,
- system.cpu_count,
-
- # Amount of memory in bytes.
- system.memory / 1024,
-
- # Send the currently running version of Pakfire.
- PAKFIRE_VERSION,
-
- # Send the host key.
- config.get("signatures", "host_key", None),
- )
-
- except XMLRPCInternalServerError:
- # Don't give a shit either.
- log.warning(_("Could not update the host information."))
-
- return
+++ /dev/null
-#!/usr/bin/python
-
-import hashlib
-import multiprocessing
-import os
-import sys
-import tempfile
-import time
-
-import pakfire.base
-import pakfire.builder
-import pakfire.config
-import pakfire.downloader
-import pakfire.system
-import pakfire.util
-from pakfire.system import system
-
-import base
-
-from pakfire.constants import *
-from pakfire.i18n import _
-
-import logging
-log = logging.getLogger("pakfire.client")
-
-def fork_builder(*args, **kwargs):
- """
- Wrapper that runs ClientBuilder in a new process and catches
- any exception to report it to the main process.
- """
- try:
- # Create new instance of the builder.
- cb = ClientBuilder(*args, **kwargs)
-
- # Run the build:
- cb.build()
-
- except Exception, e:
- # XXX catch the exception and log it.
- print e
-
- # End the process with an exit code.
- sys.exit(1)
-
-
-class PakfireDaemon(object):
- """
- The PakfireDaemon class that creates a a new process per build
- job and also handles the keepalive/abort stuff.
- """
- def __init__(self):
- self.config = pakfire.config.ConfigDaemon()
-
- server = self.config.get("daemon", "server")
- hostname = self.config.get("daemon", "hostname")
- secret = self.config.get("daemon", "secret")
-
- self.client = base.PakfireBuilderClient(server, hostname, secret)
- self.conn = self.client.conn
-
- # Save login data (to create child processes).
- self.server = server
- self.hostname = hostname
- self.__secret = secret
-
- # A list with all running processes.
- self.processes = []
- self.pid2jobid = {}
-
- # Save when last keepalive was sent.
- self._last_keepalive = 0
-
- # Send an initial keepalive message.
- self.send_keepalive(force=True)
-
- def run(self, heartbeat=1, max_processes=None):
- # By default do not start more than two processes per CPU core.
- if max_processes is None:
- max_processes = system.cpu_count * 2
- log.debug("Maximum number of simultaneous processes is: %s" % max_processes)
-
- # Indicates when to try to request a new job or aborted builds.
- last_job_request = 0
- last_abort_request = 0
-
- # Main loop.
- while True:
- # Send the keepalive regularly.
- self.send_keepalive()
-
- # Remove all finished builds.
- # "removed" indicates, if a process has actually finished.
- removed = self.remove_finished_builders()
-
- # If a build slot was freed, search immediately for a new job.
- if removed:
- last_job_request = 0
-
- # Kill aborted jobs.
- if time.time() - last_abort_request >= 60:
- aborted = self.kill_aborted_jobs()
-
- # If a build slot was freed, search immediately for a new job.
- if aborted:
- last_job_request = 0
-
- last_abort_request = time.time()
-
- # Check if the maximum number of processes was reached.
- # Actually the hub does manage this but this is an emergency
- # condition if anything goes wrong.
- if self.num_processes >= max_processes:
- log.debug("Reached maximum number of allowed processes (%s)." % max_processes)
-
- time.sleep(heartbeat)
- continue
-
- # Get new job.
- if time.time() - last_job_request >= 60 and not self.has_overload():
- # If the last job request is older than a minute and we don't
- # have too much load, we go and check if there is something
- # to do for us.
- job = self.get_job()
-
- # If we got a job, we start a child process to work on it.
- if job:
- log.debug("Got a new job.")
- self.fork_builder(job)
- else:
- log.debug("No new job.")
-
- # Update the time when we requested a job.
- last_job_request = time.time()
-
- # Wait a moment before starting over.
- time.sleep(heartbeat)
-
- def shutdown(self):
- """
- Shut down the daemon.
- This means to kill all child processes.
-
- The method blocks until all processes are shut down.
- """
- for process in self.processes:
- log.info("Sending %s to terminate..." % process)
-
- process.terminate()
- else:
- log.info("No processes to kill. Shutting down immediately.")
-
- while self.processes:
- log.debug("%s process(es) is/are still running..." % len(self.processes))
-
- for process in self.processes[:]:
- if not process.is_alive():
- # The process has terminated.
- log.info("Process %s terminated with exit code: %s" % \
- (process, process.exitcode))
-
- self.processes.remove(process)
-
- @property
- def num_processes(self):
- # Return the number of processes.
- return len(self.processes)
-
- @property
- def free_space(self):
- mp = system.get_mountpoint(BUILD_ROOT)
-
- return mp.space_left
-
- def get_job(self):
- """
- Get a build job from the hub.
- """
- if not self.free_space >= 2 * 1024**3:
- log.warning(_("Less than 2GB of free space. Cannot request a new job."))
- return
-
- log.info("Requesting a new job from the server...")
-
- # Get some information about this system.
- s = pakfire.system.System()
-
- # Fetch a build job from the hub.
- return self.client.conn.build_get_job(s.supported_arches)
-
- def has_overload(self):
- """
- Checks, if the load average is not too high.
-
- On this is to be decided if a new job is taken.
- """
- try:
- load1, load5, load15 = os.getloadavg()
- except OSError:
- # Could not determine the current loadavg. In that case we
- # assume that we don't have overload.
- return False
-
- # If there are more than 2 processes in the process queue per CPU
- # core we will assume that the system has heavy load and to not request
- # a new job.
- return load5 >= system.cpu_count * 2
-
- def send_keepalive(self, force=False):
- """
- When triggered, this method sends a keepalive to the hub.
- """
- # Do not send a keepalive more often than twice a minute.
- if time.time() - self._last_keepalive < 30:
- return
-
- kwargs = {
- "force" : force,
- "overload" : self.has_overload(),
- "free_space" : self.free_space / 1024**2,
- }
-
- self.client.send_keepalive(**kwargs)
- self._last_keepalive = time.time()
-
- def remove_finished_builders(self):
- # Return if any processes have been removed.
- ret = False
-
- # Search for any finished processes.
- for process in self.processes[:]:
- # If the process is not alive anymore...
- if not process.is_alive():
- ret = True
-
- # ... check the exit code and log a message on errors.
- if process.exitcode == 0:
- log.debug("Process %s exited normally." % process)
-
- elif process.exitcode > 0:
- log.error("Process did not exit normally: %s code: %s" \
- % (process, process.exitcode))
-
- elif process.exitcode < 0:
- log.error("Process killed by signal: %s: code: %s" \
- % (process, process.exitcode))
-
- # If a program has crashed, we send that to the hub.
- job_id = self.pid2jobid.get(process.pid, None)
- if job_id:
- self.conn.build_job_crashed(job_id, process.exitcode)
-
- # Finally, remove the process from the process list.
- self.processes.remove(process)
-
- return ret
-
- def kill_aborted_jobs(self):
- log.debug("Requesting aborted jobs...")
-
- # Get a list of running job ids:
- running_jobs = self.pid2jobid.values()
-
- # If there are no running jobs, there is nothing to do.
- if not running_jobs:
- return False
-
- # Ask the hub for any build jobs to abort.
- aborted_jobs = self.conn.build_jobs_aborted(running_jobs)
-
- # If no build jobs were aborted, there is nothing to do.
- if not aborted_jobs:
- return False
-
- for process in self.processes[:]:
- job_id = self.pid2jobid.get(process.pid, None)
- if job_id and job_id in aborted_jobs:
-
- # Kill the process.
- log.info("Killing process %s which was aborted by the user." \
- % process.pid)
- process.terminate()
-
- # Remove the process from the process list to avoid
- # that is will be cleaned up in the normal way.
- self.processes.remove(process)
-
- return True
-
- def fork_builder(self, job):
- """
- For a new child process to create a new independent builder.
- """
- # Create the Process object.
- process = multiprocessing.Process(target=fork_builder,
- args=(self.server, self.hostname, self.__secret, job))
- # The process is running in daemon mode so it will try to kill
- # all child processes when exiting.
- process.daemon = True
-
- # Start the process.
- process.start()
- log.info("Started new process %s with PID %s." % (process, process.pid))
-
- # Save the PID and the build id to track down
- # crashed builds.
- self.pid2jobid[process.pid] = job.get("id", None)
-
- # Append it to the process list.
- self.processes.append(process)
-
-
-class ClientBuilder(object):
- def __init__(self, server, hostname, secret, job):
- self.client = base.PakfireBuilderClient(server, hostname, secret)
- self.conn = self.client.conn
-
- # Store the information sent by the server here.
- self.build_job = job
-
- def update_state(self, state, message=None):
- self.conn.build_job_update_state(self.build_id, state, message)
-
- def upload_file(self, filename, type):
- assert os.path.exists(filename)
- assert type in ("package", "log")
-
- # First upload the file data and save the upload_id.
- upload_id = self.client._upload_file(filename)
-
- # Add the file to the build.
- return self.conn.build_job_add_file(self.build_id, upload_id, type)
-
- def upload_buildroot(self, installed_packages):
- pkgs = []
-
- for pkg in installed_packages:
- assert pkg.uuid, "%s has got no UUID"
- pkgs.append((pkg.friendly_name, pkg.uuid))
-
- return self.conn.build_upload_buildroot(self.build_id, pkgs)
-
- @property
- def build_id(self):
- if self.build_job:
- return self.build_job.get("id", None)
-
- @property
- def build_arch(self):
- if self.build_job:
- return self.build_job.get("arch", None)
-
- @property
- def build_source_url(self):
- if self.build_job:
- return self.build_job.get("source_url", None)
-
- @property
- def build_source_filename(self):
- if self.build_source_url:
- return os.path.basename(self.build_source_url)
-
- @property
- def build_source_hash512(self):
- if self.build_job:
- return self.build_job.get("source_hash512", None)
-
- @property
- def build_type(self):
- if self.build_job:
- return self.build_job.get("type", None)
-
- @property
- def build_config(self):
- if self.build_job:
- return self.build_job.get("config", None)
-
- def build(self):
- # Cannot go on if I got no build job.
- if not self.build_job:
- logging.info("No job to work on...")
- return
-
- # Call the function that processes the build and try to catch general
- # exceptions and report them to the server.
- # If everything goes okay, we tell this the server, too.
- try:
- # Create a temporary file and a directory for the resulting files.
- tmpdir = tempfile.mkdtemp()
- tmpfile = os.path.join(tmpdir, self.build_source_filename)
- logfile = os.path.join(tmpdir, "build.log")
- cfgfile = os.path.join(tmpdir, "job-%s.conf" % self.build_id)
-
- # Get a package grabber and add mirror download capabilities to it.
- grabber = pakfire.downloader.PackageDownloader(pakfire.config.Config())
-
- # Create pakfire configuration instance.
- config = pakfire.config.ConfigDaemon()
- config.parse(self.build_config)
-
- # Create pakfire instance.
- p = None
- try:
- p = pakfire.base.PakfireBuilder(config=config, arch=self.build_arch)
-
- # Download the source package.
- grabber = pakfire.downloader.PackageDownloader(p)
- grabber.urlgrab(self.build_source_url, filename=tmpfile)
-
- # Check if the download checksum matches (if provided).
- if self.build_source_hash512:
- h = hashlib.new("sha512")
- f = open(tmpfile, "rb")
- while True:
- buf = f.read(BUFFER_SIZE)
- if not buf:
- break
-
- h.update(buf)
- f.close()
-
- if not self.build_source_hash512 == h.hexdigest():
- raise DownloadError, "Hash check did not succeed."
-
- # Create a new instance of a build environment.
- build = pakfire.builder.BuildEnviron(p, tmpfile,
- release_build=True, build_id=self.build_id, logfile=logfile)
-
- try:
- # Create the build environment.
- build.start()
-
- # Update the build status on the server.
- self.upload_buildroot(build.installed_packages)
- self.update_state("running")
-
- # Run the build (with install test).
- build.build(install_test=True)
-
- # Copy the created packages to the tempdir.
- build.copy_result(tmpdir)
-
- finally:
- # Cleanup the build environment.
- build.stop()
-
- # Jippie, build is finished, we are going to upload the files.
- self.update_state("uploading")
-
- # Walk through the result directory and upload all (binary) files.
- # Skip that for test builds.
- if not self.build_type == "test":
- for dir, subdirs, files in os.walk(tmpdir):
- for file in files:
- file = os.path.join(dir, file)
- if file in (logfile, tmpfile,):
- continue
-
- self.upload_file(file, "package")
-
- except DependencyError, e:
- message = "%s: %s" % (e.__class__.__name__, e)
- self.update_state("dependency_error", message)
- raise
-
- except DownloadError, e:
- message = "%s: %s" % (e.__class__.__name__, e)
- self.update_state("download_error", message)
- raise
-
- finally:
- if p:
- p.destroy()
-
- # Upload the logfile in any case and if it exists.
- if os.path.exists(logfile):
- self.upload_file(logfile, "log")
-
- # Cleanup the files we created.
- pakfire.util.rm(tmpdir)
-
- except DependencyError:
- # This has already been reported.
- raise
-
- except (DownloadError,):
- # Do not take any further action for these exceptions.
- pass
-
- except Exception, e:
- # Format the exception and send it to the server.
- message = "%s: %s" % (e.__class__.__name__, e)
-
- self.update_state("failed", message)
- raise
-
- else:
- self.update_state("finished")
+++ /dev/null
-#!/usr/bin/python
-###############################################################################
-# #
-# Pakfire - The IPFire package management system #
-# Copyright (C) 2011 Pakfire development team #
-# #
-# This program is free software: you can redistribute it and/or modify #
-# it under the terms of the GNU General Public License as published by #
-# the Free Software Foundation, either version 3 of the License, or #
-# (at your option) any later version. #
-# #
-# This program is distributed in the hope that it will be useful, #
-# but WITHOUT ANY WARRANTY; without even the implied warranty of #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
-# GNU General Public License for more details. #
-# #
-# You should have received a copy of the GNU General Public License #
-# along with this program. If not, see <http://www.gnu.org/licenses/>. #
-# #
-###############################################################################
-
-import httplib
-import socket
-import ssl
-import time
-import xmlrpclib
-
-import logging
-log = logging.getLogger("pakfire.client")
-
-from pakfire.constants import *
-from pakfire.i18n import _
-
-# Set the default socket timeout to 30 seconds.
-socket.setdefaulttimeout(30)
-
-
-
-
-class XMLRPCMixin:
- user_agent = "pakfire/%s" % PAKFIRE_VERSION
-
- def single_request(self, *args, **kwargs):
- ret = None
-
- # Tries can be passed to this method.
- tries = kwargs.pop("tries", 100)
- timeout = 1
-
- while tries:
- try:
- ret = xmlrpclib.Transport.single_request(self, *args, **kwargs)
-
- # Catch errors related to the connection. Just try again.
- except (socket.error, ssl.SSLError), e:
- log.warning("Exception: %s: %s" % (e.__class__.__name__, e))
-
- # Presumably, the server closed the connection before sending anything.
- except httplib.BadStatusLine:
- # Try again immediately.
- continue
-
- # The XML reponse could not be parsed.
- except xmlrpclib.ResponseError, e:
- log.warning("Exception: %s: %s" % (e.__class__.__name__, e))
-
- except xmlrpclib.ProtocolError, e:
- if e.errcode == 403:
- # Possibly, the user credentials are invalid.
- # Cannot go on.
- raise XMLRPCForbiddenError(e)
-
- elif e.errcode == 404:
- # Some invalid URL was called.
- # Cannot go on.
- raise XMLRPCNotFoundError(e)
-
- elif e.errcode == 500:
- # This could have various reasons, so we can not
- # be sure to kill connections here.
- # But to visualize the issue, we will raise an
- # exception on the last try.
- if tries == 1:
- raise XMLRPCInternalServerError(e)
-
- elif e.errcode == 503:
- # Possibly the hub is not running but the SSL proxy
- # is. Just try again in a short time.
- pass
-
- else:
- # Log all XMLRPC protocol errors.
- log.error(_("XMLRPC protocol error:"))
- log.error(" %s" % _("URL: %s") % e.url)
- log.error(" %s" % _(" HTTP headers:"))
- for header in e.headers.items():
- log.error(" %s: %s" % header)
- log.error(" %s" % _("Error code: %s") % e.errcode)
- log.error(" %s" % _("Error message: %s") % e.errmsg)
-
- # If an unhandled error code appeared, we raise an
- # error.
- raise
-
- except xmlrpclib.Fault:
- raise
-
- else:
- # If request was successful, we can break the loop.
- break
-
- # If the request was not successful, we wait a little time to try
- # it again.
- tries -= 1
- timeout *= 2
- if timeout > 60:
- timeout = 60
-
- log.warning(_("Trying again in %(timeout)s second(s). %(tries)s tries left.") \
- % { "timeout" : timeout, "tries" : tries })
- time.sleep(timeout)
-
- else:
- raise XMLRPCTransportError, _("Maximum number of tries was reached. Giving up.")
-
- return ret
-
-
-
-class XMLRPCTransport(XMLRPCMixin, xmlrpclib.Transport):
- """
- Handles the XMLRPC connection over HTTP.
- """
- pass
-
-
-class SafeXMLRPCTransport(XMLRPCMixin, xmlrpclib.SafeTransport):
- """
- Handles the XMLRPC connection over HTTPS.
- """
- pass
-
-
-class Connection(xmlrpclib.ServerProxy):
- """
- Class wrapper that automatically chooses the right transport
- method depending on the given URL.
- """
-
- def __init__(self, url):
- # Create transport channel to the server.
- if url.startswith("https://"):
- transport = SafeXMLRPCTransport()
- elif url.startswith("http://"):
- transport = XMLRPCTransport()
-
- xmlrpclib.ServerProxy.__init__(self, url, transport=transport,
- allow_none=True)
},
}
+ def get_hub_credentials(self):
+ hub_url = self.get("client", "server")
+ username = self.get("client", "username")
+ password = self.get("client", "password")
+
+ return hub_url, username, password
+
class ConfigDaemon(_Config):
files = ["general.conf", "daemon.conf"]
"hostname" : system.hostname,
},
}
+
+ def get_hub_credentials(self):
+ hub_url = self.get("daemon", "server")
+ hostname = self.get("daemon", "hostname")
+ password = self.get("daemon", "secret")
+
+ return hub_url, hostname, password
--- /dev/null
+#!/usr/bin/python
+
+import hashlib
+import json
+import multiprocessing
+import os
+import signal
+import sys
+import tempfile
+import time
+
+import pakfire.base
+import pakfire.builder
+import pakfire.config
+import pakfire.downloader
+import pakfire.system
+import pakfire.util
+from pakfire.system import system
+
+import base
+import transport
+
+from pakfire.constants import *
+from pakfire.i18n import _
+
+import logging
+log = logging.getLogger("pakfire.daemon")
+
+class BuildJob(dict):
+ """
+ Wrapper class for build jobs, that are received from the hub.
+
+ This makes accessing attributes more easy.
+ """
+ def __getattr__(self, key):
+ try:
+ return self[key]
+ except KeyError:
+ raise AttributeError, key
+
+
+class PakfireDaemon(object):
+ def __init__(self, config):
+ self.config = config
+
+ # Indicates if this daemon is in running mode.
+ self.__running = True
+
+ # List of worker processes.
+ self.__workers = []
+
+ # Create connection to the hub.
+ self.transport = transport.PakfireHubTransport(self.config)
+
+ ### Configuration
+ # Number of workers in waiting state.
+ self.max_waiting = 1
+
+ # Number of running workers.
+ self.max_running = system.cpu_count * 2
+
+ def run(self, heartbeat=30):
+ """
+ Main loop.
+ """
+ # Register signal handlers.
+ self.register_signal_handlers()
+
+ # Send our profile to the hub.
+ self.send_builder_info()
+
+ while self.__running:
+ time_started = time.time()
+
+ # Send keepalive message.
+ self.send_keepalive()
+
+ # Spawn a sufficient number of worker processes.
+ self.spawn_workers_if_needed()
+
+ # Get runtime of this loop iteration.
+ time_elapsed = time.time() - time_started
+
+ # Wait if the heartbeat time has not been reached, yet.
+ if time_elapsed < heartbeat:
+ time.sleep(heartbeat - time_elapsed)
+
+ # Main loop has ended, but we wait until all workers have finished.
+ self.terminate_all_workers()
+
+ def shutdown(self):
+ """
+ Terminates all workers and exists the daemon.
+ """
+ if not self.__running:
+ return
+
+ log.info(_("Shutting down..."))
+ self.__running = False
+
+ def spawn_workers_if_needed(self, *args, **kwargs):
+ """
+ Spawns more workers if needed.
+ """
+ # Do not create any more processes when the daemon is shutting down.
+ if not self.__running:
+ return
+
+ # Cleanup all other workers.
+ self.cleanup_workers()
+
+ # Do not create more workers if there are already enough workers
+ # active.
+ if len(self.workers) >= self.max_running:
+ log.warning("More workers running than allowed")
+ return
+
+ # Do nothing, if there is are already enough workers waiting.
+ wanted_waiting_workers = self.max_waiting - len(self.waiting_workers)
+ if wanted_waiting_workers <= 0:
+ return
+
+ # Spawn a new worker.
+ for i in range(wanted_waiting_workers):
+ self.spawn_worker(*args, **kwargs)
+
+ def spawn_worker(self, *args, **kwargs):
+ """
+ Spawns a new worker process.
+ """
+ worker = PakfireWorker(config=self.config, *args, **kwargs)
+ worker.start()
+
+ log.debug("Spawned new worker process: %s" % worker)
+ self.__workers.append(worker)
+
+ def terminate_worker(self, worker):
+ """
+ Terminates the given worker.
+ """
+ log.warning(_("Terminating worker process: %s") % worker)
+
+ worker.terminate()
+
+ def terminate_all_workers(self):
+ """
+ Terminates all workers.
+ """
+ for worker in self.workers:
+ self.terminate_worker(worker)
+
+ # Wait until the worker has finished.
+ worker.join()
+
+ def remove_worker(self, worker):
+ """
+ Removes a worker from the internal list of worker processes.
+ """
+ assert not worker.is_alive(), "Remove alive worker?"
+
+ log.debug("Removing worker: %s" % worker)
+ try:
+ self.__workers.remove(worker)
+ except:
+ pass
+
+ def cleanup_workers(self):
+ """
+ Remove workers that are not alive any more.
+ """
+ for worker in self.workers:
+ if worker.is_alive():
+ continue
+
+ self.remove_worker(worker)
+
+ @property
+ def workers(self):
+ return self.__workers[:]
+
+ @property
+ def running_workers(self):
+ workers = []
+
+ for worker in self.workers:
+ if worker.waiting.is_set():
+ continue
+
+ workers.append(worker)
+
+ return workers
+
+ @property
+ def waiting_workers(self):
+ workers = []
+
+ for worker in self.workers:
+ if worker.waiting.is_set():
+ workers.append(worker)
+
+ return workers
+
+ # Signal handling.
+
+ def register_signal_handlers(self):
+ signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
+ signal.signal(signal.SIGINT, self.handle_SIGTERM)
+ signal.signal(signal.SIGTERM, self.handle_SIGTERM)
+
+ def handle_SIGCHLD(self, signum, frame):
+ """
+ Handle signal SIGCHLD.
+ """
+ # Spawn new workers if necessary.
+ self.spawn_workers_if_needed()
+
+ def handle_SIGTERM(self, signum, frame):
+ """
+ Handle signal SIGTERM.
+ """
+ # Just shut down.
+ self.shutdown()
+
+ # Talking to the hub.
+
+ def send_builder_info(self):
+ log.info(_("Sending builder information to hub..."))
+
+ data = {
+ # CPU info
+ "cpu_model" : system.cpu_model,
+ "cpu_count" : system.cpu_count,
+ "cpu_arch" : system.native_arch,
+ "cpu_bogomips" : system.cpu_bogomips,
+
+ # Memory + swap
+ "mem_total" : system.memory,
+ "swap_total" : system.swap_total,
+
+ # Pakfire + OS
+ "pakfire_version" : PAKFIRE_VERSION,
+ "host_key" : self.config.get("signatures", "host_key", None),
+ "os_name" : system.distro.pretty_name,
+
+ # Supported arches
+ "supported_arches" : ",".join(system.supported_arches),
+ }
+ self.transport.post("/builders/info", data=data)
+
+ def send_keepalive(self):
+ log.debug("Sending keepalive message to hub...")
+
+ data = {
+ # Load average.
+ "loadavg1" : system.loadavg1,
+ "loadavg5" : system.loadavg5,
+ "loadavg15" : system.loadavg15,
+
+ # Memory
+ "mem_total" : system.memory_total,
+ "mem_free" : system.memory_free,
+
+ # Swap
+ "swap_total" : system.swap_total,
+ "swap_free" : system.swap_free,
+
+ # Disk space
+ "space_free" : self.free_space,
+ }
+ self.transport.post("/builders/keepalive", data=data)
+
+ @property
+ def free_space(self):
+ mp = system.get_mountpoint(BUILD_ROOT)
+
+ return mp.space_left
+
+
+class PakfireWorker(multiprocessing.Process):
+ def __init__(self, config, waiting=None):
+ multiprocessing.Process.__init__(self)
+
+ # Save config.
+ self.config = config
+
+ # Waiting event. Clear if this worker is running a build.
+ self.waiting = multiprocessing.Event()
+ self.waiting.set()
+
+ # Indicates if this worker is running.
+ self.__running = True
+
+ def run(self):
+ # Register signal handlers.
+ self.register_signal_handlers()
+
+ # Create connection to the hub.
+ self.transport = transport.PakfireHubTransport(self.config)
+ self.transport.fork()
+
+ while self.__running:
+ # Try to get a new build job.
+ job = self.get_new_build_job()
+ if not job:
+ continue
+
+ # If we got a job, we are not waiting anymore.
+ self.waiting.clear()
+
+ # Run the job and return.
+ return self.execute_job(job)
+
+ def shutdown(self):
+ self.__running = False
+
+ # When we are just waiting, we can edit right away.
+ if self.waiting.is_set():
+ log.debug("Exiting immediately")
+ sys.exit(1)
+
+ # XXX figure out what to do, when a build is running.
+
+ # Signal handling.
+
+ def register_signal_handlers(self):
+ signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
+ signal.signal(signal.SIGINT, self.handle_SIGTERM)
+ signal.signal(signal.SIGTERM, self.handle_SIGTERM)
+
+ def handle_SIGCHLD(self, signum, frame):
+ """
+ Handle signal SIGCHLD.
+ """
+ # Must be here so that SIGCHLD won't be propagated to
+ # PakfireDaemon.
+ pass
+
+ def handle_SIGTERM(self, signum, frame):
+ """
+ Handle signal SIGTERM.
+ """
+ self.shutdown()
+
+ def get_new_build_job(self, timeout=600):
+ log.debug("Requesting new job...")
+
+ try:
+ job = self.transport.get_json("/builders/jobs/queue",
+ data={ "timeout" : timeout, }, timeout=timeout)
+
+ if job:
+ return BuildJob(job)
+
+ # As this is a long poll request, it is okay to time out.
+ except TransportMaxTriesExceededError:
+ pass
+
+ def execute_job(self, job):
+ log.debug("Executing job: %s" % job)
+
+ # Call the function that processes the build and try to catch general
+ # exceptions and report them to the server.
+ # If everything goes okay, we tell this the server, too.
+ try:
+ # Create a temporary file and a directory for the resulting files.
+ tmpdir = tempfile.mkdtemp()
+ tmpfile = os.path.join(tmpdir, os.path.basename(job.source_url))
+ logfile = os.path.join(tmpdir, "build.log")
+
+ # Create pakfire configuration instance.
+ config = pakfire.config.ConfigDaemon()
+ config.parse(job.config)
+
+ # Create pakfire instance.
+ p = None
+ try:
+ p = pakfire.base.PakfireBuilder(config=config, arch=job.arch)
+
+ # Download the source package.
+ grabber = pakfire.downloader.PackageDownloader(p)
+ grabber.urlgrab(job.source_url, filename=tmpfile)
+
+ # Check if the download checksum matches (if provided).
+ if job.source_hash_sha512:
+ h = hashlib.new("sha512")
+ f = open(tmpfile, "rb")
+ while True:
+ buf = f.read(BUFFER_SIZE)
+ if not buf:
+ break
+
+ h.update(buf)
+ f.close()
+
+ if not job.source_hash_sha512 == h.hexdigest():
+ raise DownloadError, "Hash check did not succeed."
+
+ # Create a new instance of a build environment.
+ build = pakfire.builder.BuildEnviron(p, tmpfile,
+ release_build=True, build_id=job.id, logfile=logfile)
+
+ try:
+ # Create the build environment.
+ build.start()
+
+ # Update the build status on the server.
+ self.upload_buildroot(job, build.installed_packages)
+ self.update_state(job, "running")
+
+ # Run the build (without install test).
+ build.build(install_test=False)
+
+ # Copy the created packages to the tempdir.
+ build.copy_result(tmpdir)
+
+ finally:
+ # Cleanup the build environment.
+ build.stop()
+
+ # Jippie, build is finished, we are going to upload the files.
+ self.update_state(job, "uploading")
+
+ # Walk through the result directory and upload all (binary) files.
+ # Skip that for test builds.
+ if not job.type == "test":
+ for dir, subdirs, files in os.walk(tmpdir):
+ for file in files:
+ file = os.path.join(dir, file)
+ if file in (logfile, tmpfile,):
+ continue
+
+ self.upload_file(job, file, "package")
+
+ except DependencyError, e:
+ message = "%s: %s" % (e.__class__.__name__, e)
+ self.update_state(job, "dependency_error", message)
+ raise
+
+ except DownloadError, e:
+ message = "%s: %s" % (e.__class__.__name__, e)
+ self.update_state(job, "download_error", message)
+ raise
+
+ finally:
+ if p:
+ p.destroy()
+
+ # Upload the logfile in any case and if it exists.
+ if os.path.exists(logfile):
+ self.upload_file(job, logfile, "log")
+
+ # Cleanup the files we created.
+ pakfire.util.rm(tmpdir)
+
+ except DependencyError:
+ # This has already been reported.
+ raise
+
+ except (DownloadError,):
+ # Do not take any further action for these exceptions.
+ pass
+
+ except (KeyboardInterrupt, SystemExit):
+ self.update_state(job, "aborted")
+
+ except Exception, e:
+ # Format the exception and send it to the server.
+ message = "%s: %s" % (e.__class__.__name__, e)
+
+ self.update_state(job, "failed", message)
+ raise
+
+ else:
+ self.update_state(job, "finished")
+
+ def update_state(self, job, state, message=None):
+ """
+ Update state of the build job on the hub.
+ """
+ data = {
+ "message" : message or "",
+ }
+
+ self.transport.post("/builders/jobs/%s/state/%s" % (job.id, state),
+ data=data)
+
+ def upload_file(self, job, filename, type):
+ assert os.path.exists(filename)
+ assert type in ("package", "log")
+
+ # First upload the file data and save the upload_id.
+ upload_id = self.transport.upload_file(filename)
+
+ data = {
+ "type" : type,
+ }
+
+ # Add the file to the build.
+ self.transport.post("/builders/jobs/%s/addfile/%s" % (job.id, upload_id),
+ data=data)
+
+ def upload_buildroot(self, job, installed_packages):
+ pkgs = []
+ for pkg in installed_packages:
+ pkgs.append((pkg.friendly_name, pkg.uuid))
+
+ data = { "buildroot" : json.dumps(pkgs) }
+
+ self.transport.post("/builders/jobs/%s/buildroot" % job.id, data=data)
import logging
log = logging.getLogger("pakfire")
-from system import system
+import system
class Distribution(object):
def __init__(self, data=None):
return
keymap = {
- "NAME" : "name",
- "VERSION_ID" : "release",
+ "NAME" : "name",
+ "PRETTY_NAME" : "pretty_name",
+ "VERSION_ID" : "release",
}
data = {}
def name(self):
return self._data.get("name", "unknown")
+ @property
+ def pretty_name(self):
+ pretty_name = self._data.get("pretty_name", None)
+ if not pretty_name:
+ pretty_name = " ".join((self.name, self.release))
+
+ return pretty_name
+
@property
def release(self):
return self._data.get("release", "0")
return self._data.get("contact", "N/A")
def get_arch(self):
- arch = self._data.get("arch", None) or system.arch
+ arch = self._data.get("arch", None) or system.system.arch
# We can not set up a build environment for noarch.
if arch == "noarch":
- arch = system.arch
+ arch = system.system.arch
return arch
None to skip the setting of the personality in the build chroot.
"""
- if self.arch == system.native_arch:
+ if self.arch == system.system.native_arch:
return None
arch2personality = {
import json
import os
+import pycurl
import random
import logging
from config import _Config
+import urlgrabber.grabber
from urlgrabber.grabber import URLGrabber, URLGrabError
from urlgrabber.mirror import MirrorGroup
from urlgrabber.progress import TextMeter
URLGrabber.__init__(self, *args, **kwargs)
+ def fork(self):
+ """
+ Reset Curl object after forking a process.
+ """
+ # XXX this is a very ugly hack and fiddles around with the internals
+ # or urlgrabber. We should not touch these, but apparently nobody
+ # else uses multiple threads or processes to talk to their servers.
+ # So we simply replace Curl with a new instance without closing
+ # the old one. This should be fixed in urlgrabber and/or pycurl.
+ urlgrabber.grabber._curl_cache = pycurl.Curl()
+
def check_offline_mode(self):
offline = self.config.get("downloader", "offline")
if not offline:
message = _("Transaction test was not successful")
+class TransportError(Error):
+ pass
+
+
+class TransportConnectionError(TransportError):
+ pass
+
+
+class TransportConnectionDNSError(TransportConnectionError):
+ pass
+
+
+class TransportConnectionProxyError(TransportConnectionError):
+ pass
+
+
+class TransportConnectionReadError(TransportConnectionError):
+ pass
+
+
+class TransportConnectionResetError(TransportConnectionError):
+ pass
+
+
+class TransportConnectionTimeoutError(TransportConnectionError):
+ pass
+
+
+class TransportConnectionWriteError(TransportConnectionError):
+ pass
+
+
+class TransportSSLError(TransportConnectionError):
+ pass
+
+
+class TransportSSLCertificateExpiredError(TransportSSLError):
+ pass
+
+
+class TransportInternalServerError(TransportError):
+ pass
+
+
+class TransportForbiddenError(TransportError):
+ pass
+
+
+class TransportMaxTriesExceededError(TransportError):
+ pass
+
+
+class TransportNotFoundError(TransportError):
+ pass
+
+
class XMLRPCError(Error):
message = _("Generic XMLRPC error.")
import os
import socket
+import distro
+
from i18n import _
class System(object):
return hn
+ @property
+ def distro(self):
+ if not hasattr(self, "_distro"):
+ self._distro = distro.Distribution()
+
+ return self._distro
+
@property
def native_arch(self):
"""
"""
return multiprocessing.cpu_count()
- @property
- def cpu_model(self):
- # Determine CPU model
- cpuinfo = {}
+ def parse_cpuinfo(self):
+ ret = {}
+
with open("/proc/cpuinfo") as f:
for line in f.readlines():
- # Break at an empty line, because all information after that
- # is redundant.
- if not line:
+ # Only parse the first block.
+ if line == "\n":
break
try:
- key, value = line.split(":")
+ # Split the lines by colons.
+ a, b = line.split(":")
+
+ # Strip whitespace.
+ a = a.strip()
+ b = b.strip()
+
+ ret[a] = b
except:
- pass # Skip invalid lines
+ pass
- key, value = key.strip(), value.strip()
- cpuinfo[key] = value
+ return ret
+
+ @property
+ def cpu_model(self):
+ cpuinfo = self.parse_cpuinfo()
ret = None
if self.arch.startswith("arm"):
return ret or _("Could not be determined")
@property
- def memory(self):
- # Determine memory size
- memory = 0
+ def cpu_bogomips(self):
+ cpuinfo = self.parse_cpuinfo()
+
+ bogomips = cpuinfo.get("bogomips", None)
+ try:
+ return float(bogomips) * self.cpu_count
+ except:
+ pass
+
+ def get_loadavg(self):
+ return os.getloadavg()
+
+ @property
+ def loadavg1(self):
+ return self.get_loadavg()[0]
+
+ @property
+ def loadavg5(self):
+ return self.get_loadavg()[1]
+
+ @property
+ def loadavg15(self):
+ return self.get_loadavg()[2]
+
+ def has_overload(self):
+ """
+ Checks, if the load average is not too high.
+
+ On this is to be decided if a new job is taken.
+ """
+ # If there are more than 2 processes in the process queue per CPU
+ # core we will assume that the system has heavy load and to not request
+ # a new job.
+ return self.loadavg5 >= self.cpu_count * 2
+
+ def parse_meminfo(self):
+ ret = {}
+
with open("/proc/meminfo") as f:
- line = f.readline()
+ for line in f.readlines():
+ try:
+ a, b, c = line.split()
- try:
- a, b, c = line.split()
- except:
- pass
- else:
- memory = int(b) * 1024
+ a = a.strip()
+ a = a.replace(":", "")
+ b = int(b)
+
+ ret[a] = b * 1024
+ except:
+ pass
+
+ return ret
+
+ @property
+ def memory_total(self):
+ meminfo = self.parse_meminfo()
+
+ return meminfo.get("MemTotal", None)
+
+ # For compatibility
+ memory = memory_total
+
+ @property
+ def memory_free(self):
+ meminfo = self.parse_meminfo()
+
+ return meminfo.get("MemFree", None)
+
+ @property
+ def swap_total(self):
+ meminfo = self.parse_meminfo()
+
+ return meminfo.get("SwapTotal", None)
+
+ @property
+ def swap_free(self):
+ meminfo = self.parse_meminfo()
- return memory
+ return meminfo.get("SwapFree", None)
def get_mountpoint(self, path):
return Mountpoint(path)
--- /dev/null
+#!/usr/bin/python
+###############################################################################
+# #
+# Pakfire - The IPFire package management system #
+# Copyright (C) 2013 Pakfire development team #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <http://www.gnu.org/licenses/>. #
+# #
+###############################################################################
+
+from __future__ import division
+
+import base64
+import hashlib
+import json
+import os
+import time
+import urlgrabber
+import urllib
+import urlparse
+
+import pakfire.downloader
+import pakfire.util
+
+from pakfire.constants import *
+from pakfire.i18n import _
+
+import logging
+log = logging.getLogger("pakfire.transport")
+
+
+class PakfireHubTransportUploader(object):
+ """
+ Handles the upload of a single file to the hub.
+ """
+
+ def __init__(self, transport, filename):
+ self.transport = transport
+ self.filename = filename
+
+ def get_upload_id(self):
+ """
+ Gets an upload from the pakfire hub.
+ """
+ # Calculate the SHA1 sum of the file to upload.
+ h = hashlib.new("sha1")
+ with open(self.filename, "rb") as f:
+ while True:
+ buf = f.read(CHUNK_SIZE)
+ if not buf:
+ break
+
+ h.update(buf)
+
+ data = {
+ "filename" : os.path.basename(self.filename),
+ "filesize" : os.path.getsize(self.filename),
+ "hash" : h.hexdigest(),
+ }
+
+ upload_id = self.transport.get("/uploads/create", data=data)
+ log.debug("Got upload id: %s" % upload_id)
+
+ return upload_id
+
+ def send_file(self, upload_id, progress_callback=None):
+ """
+ Sends the file content to the server.
+
+ The data is splitted into chunks, which are
+ sent one after an other.
+ """
+ with open(self.filename, "rb") as f:
+ # Initial chunk size.
+ chunk_size = CHUNK_SIZE
+
+ # Count the already transmitted bytes.
+ transferred = 0
+
+ while True:
+ chunk = f.read(chunk_size)
+ if not chunk:
+ break
+
+ log.debug("Got chunk of %s bytes" % len(chunk))
+
+ # Save the time when we started to send this bit.
+ time_started = time.time()
+
+ # Send the chunk to the server.
+ self.send_chunk(upload_id, chunk)
+
+ # Save the duration.time after the chunk has been transmitted
+ # and adjust chunk size to send one chunk per second.
+ duration = time.time() - time_started
+ chunk_size = int(chunk_size / duration)
+
+ # Never let chunk_size drop under CHUNK_SIZE:
+ if chunk_size < CHUNK_SIZE:
+ chunk_size = CHUNK_SIZE
+
+ # Add up the send amount of data.
+ transferred += len(chunk)
+ if progress_callback:
+ progress_callback(transferred)
+
+ def send_chunk(self, upload_id, data):
+ """
+ Sends a piece of the file to the server.
+ """
+ # Calculate checksum over the chunk data.
+ h = hashlib.new("sha512")
+ h.update(data)
+ chksum = h.hexdigest()
+
+ # Encode data in base64.
+ data = base64.b64encode(data)
+
+ # Send chunk data to the server.
+ self.transport.post("/uploads/%s/sendchunk" % upload_id,
+ data={ "chksum" : chksum, "data" : data })
+
+ def destroy_upload(self, upload_id):
+ """
+ Destroys the upload on the server.
+ """
+ self.transport.get("/uploads/%s/destroy" % upload_id)
+
+ def finish_upload(self, upload_id):
+ """
+ Signals to the server, that the upload has finished.
+ """
+ self.transport.get("/uploads/%s/finished" % upload_id)
+
+ def run(self):
+ upload_id = None
+
+ # Create a progress bar.
+ progress = pakfire.util.make_progress(
+ os.path.basename(self.filename), os.path.getsize(self.filename), speed=True, eta=True,
+ )
+
+ try:
+ # Get an upload ID.
+ upload_id = self.get_upload_id()
+
+ # Send the file content.
+ self.send_file(upload_id, progress_callback=progress.update)
+
+ except:
+ progress.finish()
+
+ # Remove broken upload from server.
+ if upload_id:
+ self.destroy_upload(upload_id)
+
+ # XXX catch fatal errors
+ raise
+
+ else:
+ progress.finish()
+
+ # If no exception was raised, the upload
+ # has finished.
+ self.finish_upload(upload_id)
+
+ # Return the upload id so some code can actually do something
+ # with the file on the server.
+ return upload_id
+
+
+class PakfireHubTransport(object):
+ """
+ Connection to the pakfire hub.
+ """
+
+ def __init__(self, config):
+ self.config = config
+
+ # Create connection to the hub.
+ self.grabber = pakfire.downloader.PakfireGrabber(
+ self.config, prefix=self.url,
+ )
+
+ def fork(self):
+ return self.grabber.fork()
+
+ @property
+ def url(self):
+ """
+ Construct a right URL out of the given
+ server, username and password.
+
+ Basicly this just adds the credentials
+ to the URL.
+ """
+ # Get credentials.
+ server, username, password = self.config.get_hub_credentials()
+
+ # Parse the given URL.
+ url = urlparse.urlparse(server)
+ assert url.scheme in ("http", "https")
+
+ # Build new URL.
+ ret = "%s://" % url.scheme
+
+ # Add credentials if provided.
+ if username and password:
+ ret += "%s:%s@" % (username, password)
+
+ # Add path components.
+ ret += url.netloc
+
+ return ret
+
+ def one_request(self, url, **kwargs):
+ try:
+ return self.grabber.urlread(url, **kwargs)
+
+ except urlgrabber.grabber.URLGrabError, e:
+ # Timeout
+ if e.errno == 12:
+ raise TransportConnectionTimeoutError, e
+
+ # Handle common HTTP errors
+ elif e.errno == 14:
+ # Connection errors
+ if e.code == 5:
+ raise TransportConnectionProxyError, url
+ elif e.code == 6:
+ raise TransportConnectionDNSError, url
+ elif e.code == 7:
+ raise TransportConnectionResetError, url
+ elif e.code == 23:
+ raise TransportConnectionWriteError, url
+ elif e.code == 26:
+ raise TransportConnectionReadError, url
+
+ # SSL errors
+ elif e.code == 52:
+ raise TransportSSLCertificateExpiredError, url
+
+ # HTTP error codes
+ elif e.code == 403:
+ raise TransportForbiddenError, url
+ elif e.code == 404:
+ raise TransportNotFoundError, url
+ elif e.code == 500:
+ raise TransportInternalServerError, url
+
+ # All other exceptions...
+ raise
+
+ def request(self, url, tries=None, **kwargs):
+ # tries = None implies wait infinitely
+
+ while tries or tries is None:
+ if tries:
+ tries -= 1
+
+ try:
+ return self.one_request(url, **kwargs)
+
+ # 500 - Internal Server Error
+ except TransportInternalServerError, e:
+ log.exception("%s" % e.__class__.__name__)
+
+ # Wait a minute before trying again.
+ time.sleep(60)
+
+ # Retry on connection problems.
+ except TransportConnectionError, e:
+ log.exception("%s" % e.__class__.__name__)
+
+ # Wait for 10 seconds.
+ time.sleep(10)
+
+ raise TransportMaxTriesExceededError
+
+ def escape_args(self, **kwargs):
+ return urllib.urlencode(kwargs)
+
+ def get(self, url, data={}, **kwargs):
+ """
+ Sends a HTTP GET request to the given URL.
+
+ All given keyword arguments are considered as form data.
+ """
+ params = self.escape_args(**data)
+
+ if params:
+ url = "%s?%s" % (url, params)
+
+ return self.request(url, **kwargs)
+
+ def post(self, url, data={}, **kwargs):
+ """
+ Sends a HTTP POST request to the given URL.
+
+ All keyword arguments are considered as form data.
+ """
+ params = self.escape_args(**data)
+ if params:
+ kwargs.update({
+ "data" : params,
+ })
+
+ return self.request(url, **kwargs)
+
+ def upload_file(self, filename):
+ """
+ Uploads the given file to the server.
+ """
+ uploader = PakfireHubTransportUploader(self, filename)
+ upload_id = uploader.run()
+
+ return upload_id
+
+ def get_json(self, *args, **kwargs):
+ res = self.get(*args, **kwargs)
+
+ # Decode JSON.
+ if res:
+ return json.loads(res)
+
+ ### Misc. actions
+
+ def noop(self):
+ """
+ No operation. Just to check if the connection is
+ working. Returns a random number.
+ """
+ return self.get("/noop")
+
+ def test_code(self, error_code):
+ assert error_code >= 100 and error_code <= 999
+
+ self.get("/error/test/%s" % error_code)
+
+ # Build actions
+
+ def build_create(self, filename, build_type, arches=None, distro=None):
+ """
+ Create a new build on the hub.
+ """
+ assert build_type in ("scratch", "release")
+
+ # XXX Check for permission to actually create a build.
+
+ # Upload the source file to the server.
+ upload_id = self.upload_file(filename)
+
+ data = {
+ "arches" : ",".join(arches or []),
+ "build_type" : build_type,
+ "distro" : distro or "",
+ "upload_id" : upload_id,
+ }
+
+ # Then create the build.
+ build_id = self.get("/builds/create", data=data)
+
+ return build_id or None
+
+ def build_get(self, build_uuid):
+ return self.get_json("/builds/%s" % build_uuid)
+
+ # Job actions
+
+ def job_get(self, job_uuid):
+ return self.get_json("/jobs/%s" % job_uuid)
+
+ # Package actions
+
+ def package_get(self, package_uuid):
+ return self.get_json("/packages/%s" % package_uuid)