From aa14071dab49bbc167db302ad2130d3f59a390c7 Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Thu, 28 Feb 2013 12:35:33 +0100 Subject: [PATCH] Update pakfire-daemon: Build some sort of preforked worker model like Apache does. The XMLRPC backend has been replaced by a proper API for which urlgrabber is used. --- Makeconfig | 2 +- python/pakfire/cli.py | 31 +- python/pakfire/client.py | 83 +++++ python/pakfire/client/__init__.py | 4 - python/pakfire/client/base.py | 249 -------------- python/pakfire/client/builder.py | 497 ---------------------------- python/pakfire/client/transport.py | 158 --------- python/pakfire/config.py | 14 + python/pakfire/daemon.py | 509 +++++++++++++++++++++++++++++ python/pakfire/distro.py | 21 +- python/pakfire/downloader.py | 13 + python/pakfire/errors.py | 56 ++++ python/pakfire/system.py | 126 +++++-- python/pakfire/transport.py | 387 ++++++++++++++++++++++ 14 files changed, 1200 insertions(+), 950 deletions(-) create mode 100644 python/pakfire/client.py delete mode 100644 python/pakfire/client/__init__.py delete mode 100644 python/pakfire/client/base.py delete mode 100644 python/pakfire/client/builder.py delete mode 100644 python/pakfire/client/transport.py create mode 100644 python/pakfire/daemon.py create mode 100644 python/pakfire/transport.py diff --git a/Makeconfig b/Makeconfig index e717238f..8354a692 100644 --- a/Makeconfig +++ b/Makeconfig @@ -33,7 +33,7 @@ endif PYTHON_CC = $(CC) -pthread -fPIC PYTHON_CFLAGS = $(shell python-config --cflags) -PYTHON_MODULES = pakfire pakfire/client pakfire/packages pakfire/repository +PYTHON_MODULES = pakfire pakfire/packages pakfire/repository ifeq "$(DEBIAN)" "1" PYTHON_DIR = $(LIBDIR)/python$(PYTHON_VERSION)/dist-packages else diff --git a/python/pakfire/cli.py b/python/pakfire/cli.py index 615aacdb..232aad87 100644 --- a/python/pakfire/cli.py +++ b/python/pakfire/cli.py @@ -29,6 +29,7 @@ import tempfile import base import client import config +import daemon import logger import packages import repository @@ -879,15 +880,11 @@ class CliClient(Cli): "test" : self.handle_test, } - # Read configuration for the pakfire client. + # Read configuration. self.config = config.ConfigClient() # Create connection to pakfire hub. - self.client = client.PakfireUserClient( - self.config.get("client", "server"), - self.config.get("client", "username"), - self.config.get("client", "password"), - ) + self.client = client.PakfireClient(self.config) @property def pakfire_args(self): @@ -989,21 +986,23 @@ class CliClient(Cli): # Format arches. if self.args.arch: - arches = self.args.arch.replace(",", " ") + arches = self.args.arch.split(",") else: arches = None # Create a new build on the server. - build = self.client.build_create(package, arches=arches) - - # XXX Print the resulting build. - print build + build_id = self.client.build_create(package, build_type="scratch", + arches=arches) finally: # Cleanup the temporary directory and all files. if os.path.exists(temp_dir): shutil.rmtree(temp_dir, ignore_errors=True) + # Monitor the build. + if build_id: + self.watch_build(build_id) + def handle_info(self): ret = [] @@ -1183,6 +1182,11 @@ class CliClient(Cli): res = self.client.test_code(error_code) print _("Reponse from the server: %s") % res + def watch_build(self, build_id): + print self.client.build_get(build_id) + # XXX TODO + print build_id + class CliDaemon(Cli): def __init__(self): @@ -1200,10 +1204,11 @@ class CliDaemon(Cli): Runs the pakfire daemon with provided settings. """ # Read the configuration file for the daemon. - conf = config.ConfigDaemon() + self.config = config.ConfigDaemon() + logger.setup_logging(self.config) # Create daemon instance. - d = client.PakfireDaemon() + d = daemon.PakfireDaemon(self.config) try: d.run() diff --git a/python/pakfire/client.py b/python/pakfire/client.py new file mode 100644 index 00000000..c1f7ba65 --- /dev/null +++ b/python/pakfire/client.py @@ -0,0 +1,83 @@ +#!/usr/bin/python +############################################################################### +# # +# Pakfire - The IPFire package management system # +# Copyright (C) 2013 Pakfire development team # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +############################################################################### + +import transport + +from pakfire.constants import * +from pakfire.i18n import _ + +import logging +log = logging.getLogger("pakfire.client") + +class PakfireClient(object): + def __init__(self, config): + self.config = config + + # Create connection to the hub. + self.transport = transport.PakfireHubTransport(self.config) + + def build_create(self, *args, **kwargs): + return self.transport.build_create(*args, **kwargs) + + def build_get(self, *args, **kwargs): + return self.transport.build_get(*args, **kwargs) + + def job_get(self, *args, **kwargs): + return self.transport.job_get(*args, **kwargs) + + def package_get(self, *args, **kwargs): + return self.transport.package_get(*args, **kwargs) + +# XXX OLD CODE + +class PakfireUserClient(PakfireClient): + type = "user" + + def check_auth(self): + """ + Check if the user was successfully authenticated. + """ + return self.conn.check_auth() + + def get_user_profile(self): + """ + Get information about the user profile. + """ + return self.conn.get_user_profile() + + def get_builds(self, type=None, limit=10, offset=0): + return self.conn.get_builds(type=type, limit=limit, offset=offset) + + def get_build(self, build_id): + return self.conn.get_build(build_id) + + def get_builder(self, builder_id): + return self.conn.get_builder(builder_id) + + def get_job(self, job_id): + return self.conn.get_job(job_id) + + def get_latest_jobs(self): + return self.conn.get_latest_jobs() + + def get_active_jobs(self): + return self.conn.get_active_jobs() + diff --git a/python/pakfire/client/__init__.py b/python/pakfire/client/__init__.py deleted file mode 100644 index 64adfd01..00000000 --- a/python/pakfire/client/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/python - -from base import PakfireUserClient, PakfireBuilderClient -from builder import PakfireDaemon, ClientBuilder diff --git a/python/pakfire/client/base.py b/python/pakfire/client/base.py deleted file mode 100644 index cd661ca0..00000000 --- a/python/pakfire/client/base.py +++ /dev/null @@ -1,249 +0,0 @@ -#!/usr/bin/python - -from __future__ import division - -import os -import socket -import urlparse -import xmlrpclib - -import pakfire.util -import pakfire.packages as packages -from pakfire.system import system - -# Local modules. -import transport - -from pakfire.constants import * -from pakfire.i18n import _ - -import logging -log = logging.getLogger("pakfire.client") - -class PakfireClient(object): - type = None - - def __init__(self, server, username, password): - self.url = self._join_url(server, username, password) - - # Create a secure XMLRPC connection to the server. - self.conn = transport.Connection(self.url) - - def _join_url(self, server, username, password): - """ - Construct a right URL out of the given - server, username and password. - - Basicly this just adds the credentials - to the URL. - """ - assert self.type - - # Parse the given URL. - url = urlparse.urlparse(server) - assert url.scheme in ("http", "https") - - # Build new URL. - ret = "%s://" % url.scheme - - # Add credentials if provided. - if username and password: - ret += "%s:%s@" % (username, password) - - # Add host and path components. - ret += "/".join((url.netloc, self.type)) - - return ret - - ### Misc. actions - - def noop(self): - """ - No operation. Just to check if the connection is - working. Returns a random number. - """ - return self.conn.noop() - - def test_code(self, error_code): - assert error_code >= 100 and error_code <= 999 - - return self.conn.test_code(error_code) - - def get_my_address(self): - """ - Get my own address (as seen by the hub). - """ - return self.conn.get_my_address() - - def get_hub_status(self): - """ - Get some status information about the hub. - """ - return self.conn.get_hub_status() - - -class BuildMixin(object): - ### Build actions - - def build_create(self, filename, arches=None, distro=None): - """ - Create a new build on the hub. - """ - - # Upload the source file to the server. - upload_id = self._upload_file(filename) - - # Then create the build. - build = self.conn.build_create(upload_id, distro, arches) - - print build - - def _upload_file(self, filename): - # Get the hash of the file. - hash = pakfire.util.calc_hash1(filename) - - # Get the size of the file. - size = os.path.getsize(filename) - - # Get an upload ID from the server. - upload_id = self.conn.upload_create(os.path.basename(filename), - size, hash) - - # Make a nice progressbar. - pb = pakfire.util.make_progress(os.path.basename(filename), size, speed=True, eta=True) - - try: - # Calculate the number of chunks. - chunks = (size // CHUNK_SIZE) + 1 - transferred = 0 - - # Cut the file in pieces and upload them one after another. - with open(filename) as f: - chunk = 0 - while True: - data = f.read(CHUNK_SIZE) - if not data: - break - - chunk += 1 - if pb: - transferred += len(data) - pb.update(transferred) - - data = xmlrpclib.Binary(data) - self.conn.upload_chunk(upload_id, data) - - # Tell the server, that we finished the upload. - ret = self.conn.upload_finished(upload_id) - - except: - # If anything goes wrong, try to delete the upload and raise - # the exception. - self.conn.upload_remove(upload_id) - - raise - - finally: - if pb: - pb.finish() - - # If the server sends false, something happened with the upload that - # could not be recovered. - if not ret: - logging.error("Upload of %s was not successful." % filename) - raise Exception, "Upload failed." - - return upload_id - - -class PakfireUserClient(BuildMixin, PakfireClient): - type = "user" - - def check_auth(self): - """ - Check if the user was successfully authenticated. - """ - return self.conn.check_auth() - - def get_user_profile(self): - """ - Get information about the user profile. - """ - return self.conn.get_user_profile() - - def get_builds(self, type=None, limit=10, offset=0): - return self.conn.get_builds(type=type, limit=limit, offset=offset) - - def get_build(self, build_id): - return self.conn.get_build(build_id) - - def get_builder(self, builder_id): - return self.conn.get_builder(builder_id) - - def get_job(self, job_id): - return self.conn.get_job(job_id) - - def get_latest_jobs(self): - return self.conn.get_latest_jobs() - - def get_active_jobs(self): - return self.conn.get_active_jobs() - - -class PakfireBuilderClient(BuildMixin, PakfireClient): - type = "builder" - - def send_keepalive(self, force=False, overload=None, free_space=None): - """ - Sends a little keepalive to the server and - updates the hardware information if the server - requests it. - """ - log.debug("Sending keepalive to the hub.") - - # Collect the current loadavg and send it to the hub. - loadavg = ", ".join(("%.2f" % round(l, 2) for l in os.getloadavg())) - - try: - needs_update = self.conn.send_keepalive(loadavg, overload, free_space) - - except XMLRPCInternalServerError: - # If the keepalive message could not successfully be sent, we don't - # bother, because the client will soon retry. - log.warning(_("Could not send a keepalive message to the hub.")) - - return - - if force or needs_update: - log.debug("The hub is requesting an update.") - self.send_update() - - def send_update(self): - log.info("Sending host information update to hub...") - - config = pakfire.config.ConfigDaemon() - - try: - self.conn.send_update( - # Supported architectures. - system.supported_arches, - - # CPU information. - system.cpu_model, - system.cpu_count, - - # Amount of memory in bytes. - system.memory / 1024, - - # Send the currently running version of Pakfire. - PAKFIRE_VERSION, - - # Send the host key. - config.get("signatures", "host_key", None), - ) - - except XMLRPCInternalServerError: - # Don't give a shit either. - log.warning(_("Could not update the host information.")) - - return diff --git a/python/pakfire/client/builder.py b/python/pakfire/client/builder.py deleted file mode 100644 index 9aefa1c4..00000000 --- a/python/pakfire/client/builder.py +++ /dev/null @@ -1,497 +0,0 @@ -#!/usr/bin/python - -import hashlib -import multiprocessing -import os -import sys -import tempfile -import time - -import pakfire.base -import pakfire.builder -import pakfire.config -import pakfire.downloader -import pakfire.system -import pakfire.util -from pakfire.system import system - -import base - -from pakfire.constants import * -from pakfire.i18n import _ - -import logging -log = logging.getLogger("pakfire.client") - -def fork_builder(*args, **kwargs): - """ - Wrapper that runs ClientBuilder in a new process and catches - any exception to report it to the main process. - """ - try: - # Create new instance of the builder. - cb = ClientBuilder(*args, **kwargs) - - # Run the build: - cb.build() - - except Exception, e: - # XXX catch the exception and log it. - print e - - # End the process with an exit code. - sys.exit(1) - - -class PakfireDaemon(object): - """ - The PakfireDaemon class that creates a a new process per build - job and also handles the keepalive/abort stuff. - """ - def __init__(self): - self.config = pakfire.config.ConfigDaemon() - - server = self.config.get("daemon", "server") - hostname = self.config.get("daemon", "hostname") - secret = self.config.get("daemon", "secret") - - self.client = base.PakfireBuilderClient(server, hostname, secret) - self.conn = self.client.conn - - # Save login data (to create child processes). - self.server = server - self.hostname = hostname - self.__secret = secret - - # A list with all running processes. - self.processes = [] - self.pid2jobid = {} - - # Save when last keepalive was sent. - self._last_keepalive = 0 - - # Send an initial keepalive message. - self.send_keepalive(force=True) - - def run(self, heartbeat=1, max_processes=None): - # By default do not start more than two processes per CPU core. - if max_processes is None: - max_processes = system.cpu_count * 2 - log.debug("Maximum number of simultaneous processes is: %s" % max_processes) - - # Indicates when to try to request a new job or aborted builds. - last_job_request = 0 - last_abort_request = 0 - - # Main loop. - while True: - # Send the keepalive regularly. - self.send_keepalive() - - # Remove all finished builds. - # "removed" indicates, if a process has actually finished. - removed = self.remove_finished_builders() - - # If a build slot was freed, search immediately for a new job. - if removed: - last_job_request = 0 - - # Kill aborted jobs. - if time.time() - last_abort_request >= 60: - aborted = self.kill_aborted_jobs() - - # If a build slot was freed, search immediately for a new job. - if aborted: - last_job_request = 0 - - last_abort_request = time.time() - - # Check if the maximum number of processes was reached. - # Actually the hub does manage this but this is an emergency - # condition if anything goes wrong. - if self.num_processes >= max_processes: - log.debug("Reached maximum number of allowed processes (%s)." % max_processes) - - time.sleep(heartbeat) - continue - - # Get new job. - if time.time() - last_job_request >= 60 and not self.has_overload(): - # If the last job request is older than a minute and we don't - # have too much load, we go and check if there is something - # to do for us. - job = self.get_job() - - # If we got a job, we start a child process to work on it. - if job: - log.debug("Got a new job.") - self.fork_builder(job) - else: - log.debug("No new job.") - - # Update the time when we requested a job. - last_job_request = time.time() - - # Wait a moment before starting over. - time.sleep(heartbeat) - - def shutdown(self): - """ - Shut down the daemon. - This means to kill all child processes. - - The method blocks until all processes are shut down. - """ - for process in self.processes: - log.info("Sending %s to terminate..." % process) - - process.terminate() - else: - log.info("No processes to kill. Shutting down immediately.") - - while self.processes: - log.debug("%s process(es) is/are still running..." % len(self.processes)) - - for process in self.processes[:]: - if not process.is_alive(): - # The process has terminated. - log.info("Process %s terminated with exit code: %s" % \ - (process, process.exitcode)) - - self.processes.remove(process) - - @property - def num_processes(self): - # Return the number of processes. - return len(self.processes) - - @property - def free_space(self): - mp = system.get_mountpoint(BUILD_ROOT) - - return mp.space_left - - def get_job(self): - """ - Get a build job from the hub. - """ - if not self.free_space >= 2 * 1024**3: - log.warning(_("Less than 2GB of free space. Cannot request a new job.")) - return - - log.info("Requesting a new job from the server...") - - # Get some information about this system. - s = pakfire.system.System() - - # Fetch a build job from the hub. - return self.client.conn.build_get_job(s.supported_arches) - - def has_overload(self): - """ - Checks, if the load average is not too high. - - On this is to be decided if a new job is taken. - """ - try: - load1, load5, load15 = os.getloadavg() - except OSError: - # Could not determine the current loadavg. In that case we - # assume that we don't have overload. - return False - - # If there are more than 2 processes in the process queue per CPU - # core we will assume that the system has heavy load and to not request - # a new job. - return load5 >= system.cpu_count * 2 - - def send_keepalive(self, force=False): - """ - When triggered, this method sends a keepalive to the hub. - """ - # Do not send a keepalive more often than twice a minute. - if time.time() - self._last_keepalive < 30: - return - - kwargs = { - "force" : force, - "overload" : self.has_overload(), - "free_space" : self.free_space / 1024**2, - } - - self.client.send_keepalive(**kwargs) - self._last_keepalive = time.time() - - def remove_finished_builders(self): - # Return if any processes have been removed. - ret = False - - # Search for any finished processes. - for process in self.processes[:]: - # If the process is not alive anymore... - if not process.is_alive(): - ret = True - - # ... check the exit code and log a message on errors. - if process.exitcode == 0: - log.debug("Process %s exited normally." % process) - - elif process.exitcode > 0: - log.error("Process did not exit normally: %s code: %s" \ - % (process, process.exitcode)) - - elif process.exitcode < 0: - log.error("Process killed by signal: %s: code: %s" \ - % (process, process.exitcode)) - - # If a program has crashed, we send that to the hub. - job_id = self.pid2jobid.get(process.pid, None) - if job_id: - self.conn.build_job_crashed(job_id, process.exitcode) - - # Finally, remove the process from the process list. - self.processes.remove(process) - - return ret - - def kill_aborted_jobs(self): - log.debug("Requesting aborted jobs...") - - # Get a list of running job ids: - running_jobs = self.pid2jobid.values() - - # If there are no running jobs, there is nothing to do. - if not running_jobs: - return False - - # Ask the hub for any build jobs to abort. - aborted_jobs = self.conn.build_jobs_aborted(running_jobs) - - # If no build jobs were aborted, there is nothing to do. - if not aborted_jobs: - return False - - for process in self.processes[:]: - job_id = self.pid2jobid.get(process.pid, None) - if job_id and job_id in aborted_jobs: - - # Kill the process. - log.info("Killing process %s which was aborted by the user." \ - % process.pid) - process.terminate() - - # Remove the process from the process list to avoid - # that is will be cleaned up in the normal way. - self.processes.remove(process) - - return True - - def fork_builder(self, job): - """ - For a new child process to create a new independent builder. - """ - # Create the Process object. - process = multiprocessing.Process(target=fork_builder, - args=(self.server, self.hostname, self.__secret, job)) - # The process is running in daemon mode so it will try to kill - # all child processes when exiting. - process.daemon = True - - # Start the process. - process.start() - log.info("Started new process %s with PID %s." % (process, process.pid)) - - # Save the PID and the build id to track down - # crashed builds. - self.pid2jobid[process.pid] = job.get("id", None) - - # Append it to the process list. - self.processes.append(process) - - -class ClientBuilder(object): - def __init__(self, server, hostname, secret, job): - self.client = base.PakfireBuilderClient(server, hostname, secret) - self.conn = self.client.conn - - # Store the information sent by the server here. - self.build_job = job - - def update_state(self, state, message=None): - self.conn.build_job_update_state(self.build_id, state, message) - - def upload_file(self, filename, type): - assert os.path.exists(filename) - assert type in ("package", "log") - - # First upload the file data and save the upload_id. - upload_id = self.client._upload_file(filename) - - # Add the file to the build. - return self.conn.build_job_add_file(self.build_id, upload_id, type) - - def upload_buildroot(self, installed_packages): - pkgs = [] - - for pkg in installed_packages: - assert pkg.uuid, "%s has got no UUID" - pkgs.append((pkg.friendly_name, pkg.uuid)) - - return self.conn.build_upload_buildroot(self.build_id, pkgs) - - @property - def build_id(self): - if self.build_job: - return self.build_job.get("id", None) - - @property - def build_arch(self): - if self.build_job: - return self.build_job.get("arch", None) - - @property - def build_source_url(self): - if self.build_job: - return self.build_job.get("source_url", None) - - @property - def build_source_filename(self): - if self.build_source_url: - return os.path.basename(self.build_source_url) - - @property - def build_source_hash512(self): - if self.build_job: - return self.build_job.get("source_hash512", None) - - @property - def build_type(self): - if self.build_job: - return self.build_job.get("type", None) - - @property - def build_config(self): - if self.build_job: - return self.build_job.get("config", None) - - def build(self): - # Cannot go on if I got no build job. - if not self.build_job: - logging.info("No job to work on...") - return - - # Call the function that processes the build and try to catch general - # exceptions and report them to the server. - # If everything goes okay, we tell this the server, too. - try: - # Create a temporary file and a directory for the resulting files. - tmpdir = tempfile.mkdtemp() - tmpfile = os.path.join(tmpdir, self.build_source_filename) - logfile = os.path.join(tmpdir, "build.log") - cfgfile = os.path.join(tmpdir, "job-%s.conf" % self.build_id) - - # Get a package grabber and add mirror download capabilities to it. - grabber = pakfire.downloader.PackageDownloader(pakfire.config.Config()) - - # Create pakfire configuration instance. - config = pakfire.config.ConfigDaemon() - config.parse(self.build_config) - - # Create pakfire instance. - p = None - try: - p = pakfire.base.PakfireBuilder(config=config, arch=self.build_arch) - - # Download the source package. - grabber = pakfire.downloader.PackageDownloader(p) - grabber.urlgrab(self.build_source_url, filename=tmpfile) - - # Check if the download checksum matches (if provided). - if self.build_source_hash512: - h = hashlib.new("sha512") - f = open(tmpfile, "rb") - while True: - buf = f.read(BUFFER_SIZE) - if not buf: - break - - h.update(buf) - f.close() - - if not self.build_source_hash512 == h.hexdigest(): - raise DownloadError, "Hash check did not succeed." - - # Create a new instance of a build environment. - build = pakfire.builder.BuildEnviron(p, tmpfile, - release_build=True, build_id=self.build_id, logfile=logfile) - - try: - # Create the build environment. - build.start() - - # Update the build status on the server. - self.upload_buildroot(build.installed_packages) - self.update_state("running") - - # Run the build (with install test). - build.build(install_test=True) - - # Copy the created packages to the tempdir. - build.copy_result(tmpdir) - - finally: - # Cleanup the build environment. - build.stop() - - # Jippie, build is finished, we are going to upload the files. - self.update_state("uploading") - - # Walk through the result directory and upload all (binary) files. - # Skip that for test builds. - if not self.build_type == "test": - for dir, subdirs, files in os.walk(tmpdir): - for file in files: - file = os.path.join(dir, file) - if file in (logfile, tmpfile,): - continue - - self.upload_file(file, "package") - - except DependencyError, e: - message = "%s: %s" % (e.__class__.__name__, e) - self.update_state("dependency_error", message) - raise - - except DownloadError, e: - message = "%s: %s" % (e.__class__.__name__, e) - self.update_state("download_error", message) - raise - - finally: - if p: - p.destroy() - - # Upload the logfile in any case and if it exists. - if os.path.exists(logfile): - self.upload_file(logfile, "log") - - # Cleanup the files we created. - pakfire.util.rm(tmpdir) - - except DependencyError: - # This has already been reported. - raise - - except (DownloadError,): - # Do not take any further action for these exceptions. - pass - - except Exception, e: - # Format the exception and send it to the server. - message = "%s: %s" % (e.__class__.__name__, e) - - self.update_state("failed", message) - raise - - else: - self.update_state("finished") diff --git a/python/pakfire/client/transport.py b/python/pakfire/client/transport.py deleted file mode 100644 index 319c48cf..00000000 --- a/python/pakfire/client/transport.py +++ /dev/null @@ -1,158 +0,0 @@ -#!/usr/bin/python -############################################################################### -# # -# Pakfire - The IPFire package management system # -# Copyright (C) 2011 Pakfire development team # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see . # -# # -############################################################################### - -import httplib -import socket -import ssl -import time -import xmlrpclib - -import logging -log = logging.getLogger("pakfire.client") - -from pakfire.constants import * -from pakfire.i18n import _ - -# Set the default socket timeout to 30 seconds. -socket.setdefaulttimeout(30) - - - - -class XMLRPCMixin: - user_agent = "pakfire/%s" % PAKFIRE_VERSION - - def single_request(self, *args, **kwargs): - ret = None - - # Tries can be passed to this method. - tries = kwargs.pop("tries", 100) - timeout = 1 - - while tries: - try: - ret = xmlrpclib.Transport.single_request(self, *args, **kwargs) - - # Catch errors related to the connection. Just try again. - except (socket.error, ssl.SSLError), e: - log.warning("Exception: %s: %s" % (e.__class__.__name__, e)) - - # Presumably, the server closed the connection before sending anything. - except httplib.BadStatusLine: - # Try again immediately. - continue - - # The XML reponse could not be parsed. - except xmlrpclib.ResponseError, e: - log.warning("Exception: %s: %s" % (e.__class__.__name__, e)) - - except xmlrpclib.ProtocolError, e: - if e.errcode == 403: - # Possibly, the user credentials are invalid. - # Cannot go on. - raise XMLRPCForbiddenError(e) - - elif e.errcode == 404: - # Some invalid URL was called. - # Cannot go on. - raise XMLRPCNotFoundError(e) - - elif e.errcode == 500: - # This could have various reasons, so we can not - # be sure to kill connections here. - # But to visualize the issue, we will raise an - # exception on the last try. - if tries == 1: - raise XMLRPCInternalServerError(e) - - elif e.errcode == 503: - # Possibly the hub is not running but the SSL proxy - # is. Just try again in a short time. - pass - - else: - # Log all XMLRPC protocol errors. - log.error(_("XMLRPC protocol error:")) - log.error(" %s" % _("URL: %s") % e.url) - log.error(" %s" % _(" HTTP headers:")) - for header in e.headers.items(): - log.error(" %s: %s" % header) - log.error(" %s" % _("Error code: %s") % e.errcode) - log.error(" %s" % _("Error message: %s") % e.errmsg) - - # If an unhandled error code appeared, we raise an - # error. - raise - - except xmlrpclib.Fault: - raise - - else: - # If request was successful, we can break the loop. - break - - # If the request was not successful, we wait a little time to try - # it again. - tries -= 1 - timeout *= 2 - if timeout > 60: - timeout = 60 - - log.warning(_("Trying again in %(timeout)s second(s). %(tries)s tries left.") \ - % { "timeout" : timeout, "tries" : tries }) - time.sleep(timeout) - - else: - raise XMLRPCTransportError, _("Maximum number of tries was reached. Giving up.") - - return ret - - - -class XMLRPCTransport(XMLRPCMixin, xmlrpclib.Transport): - """ - Handles the XMLRPC connection over HTTP. - """ - pass - - -class SafeXMLRPCTransport(XMLRPCMixin, xmlrpclib.SafeTransport): - """ - Handles the XMLRPC connection over HTTPS. - """ - pass - - -class Connection(xmlrpclib.ServerProxy): - """ - Class wrapper that automatically chooses the right transport - method depending on the given URL. - """ - - def __init__(self, url): - # Create transport channel to the server. - if url.startswith("https://"): - transport = SafeXMLRPCTransport() - elif url.startswith("http://"): - transport = XMLRPCTransport() - - xmlrpclib.ServerProxy.__init__(self, url, transport=transport, - allow_none=True) diff --git a/python/pakfire/config.py b/python/pakfire/config.py index 08d9e8c1..1b30391e 100644 --- a/python/pakfire/config.py +++ b/python/pakfire/config.py @@ -253,6 +253,13 @@ class ConfigClient(_Config): }, } + def get_hub_credentials(self): + hub_url = self.get("client", "server") + username = self.get("client", "username") + password = self.get("client", "password") + + return hub_url, username, password + class ConfigDaemon(_Config): files = ["general.conf", "daemon.conf"] @@ -268,3 +275,10 @@ class ConfigDaemon(_Config): "hostname" : system.hostname, }, } + + def get_hub_credentials(self): + hub_url = self.get("daemon", "server") + hostname = self.get("daemon", "hostname") + password = self.get("daemon", "secret") + + return hub_url, hostname, password diff --git a/python/pakfire/daemon.py b/python/pakfire/daemon.py new file mode 100644 index 00000000..d6e641d7 --- /dev/null +++ b/python/pakfire/daemon.py @@ -0,0 +1,509 @@ +#!/usr/bin/python + +import hashlib +import json +import multiprocessing +import os +import signal +import sys +import tempfile +import time + +import pakfire.base +import pakfire.builder +import pakfire.config +import pakfire.downloader +import pakfire.system +import pakfire.util +from pakfire.system import system + +import base +import transport + +from pakfire.constants import * +from pakfire.i18n import _ + +import logging +log = logging.getLogger("pakfire.daemon") + +class BuildJob(dict): + """ + Wrapper class for build jobs, that are received from the hub. + + This makes accessing attributes more easy. + """ + def __getattr__(self, key): + try: + return self[key] + except KeyError: + raise AttributeError, key + + +class PakfireDaemon(object): + def __init__(self, config): + self.config = config + + # Indicates if this daemon is in running mode. + self.__running = True + + # List of worker processes. + self.__workers = [] + + # Create connection to the hub. + self.transport = transport.PakfireHubTransport(self.config) + + ### Configuration + # Number of workers in waiting state. + self.max_waiting = 1 + + # Number of running workers. + self.max_running = system.cpu_count * 2 + + def run(self, heartbeat=30): + """ + Main loop. + """ + # Register signal handlers. + self.register_signal_handlers() + + # Send our profile to the hub. + self.send_builder_info() + + while self.__running: + time_started = time.time() + + # Send keepalive message. + self.send_keepalive() + + # Spawn a sufficient number of worker processes. + self.spawn_workers_if_needed() + + # Get runtime of this loop iteration. + time_elapsed = time.time() - time_started + + # Wait if the heartbeat time has not been reached, yet. + if time_elapsed < heartbeat: + time.sleep(heartbeat - time_elapsed) + + # Main loop has ended, but we wait until all workers have finished. + self.terminate_all_workers() + + def shutdown(self): + """ + Terminates all workers and exists the daemon. + """ + if not self.__running: + return + + log.info(_("Shutting down...")) + self.__running = False + + def spawn_workers_if_needed(self, *args, **kwargs): + """ + Spawns more workers if needed. + """ + # Do not create any more processes when the daemon is shutting down. + if not self.__running: + return + + # Cleanup all other workers. + self.cleanup_workers() + + # Do not create more workers if there are already enough workers + # active. + if len(self.workers) >= self.max_running: + log.warning("More workers running than allowed") + return + + # Do nothing, if there is are already enough workers waiting. + wanted_waiting_workers = self.max_waiting - len(self.waiting_workers) + if wanted_waiting_workers <= 0: + return + + # Spawn a new worker. + for i in range(wanted_waiting_workers): + self.spawn_worker(*args, **kwargs) + + def spawn_worker(self, *args, **kwargs): + """ + Spawns a new worker process. + """ + worker = PakfireWorker(config=self.config, *args, **kwargs) + worker.start() + + log.debug("Spawned new worker process: %s" % worker) + self.__workers.append(worker) + + def terminate_worker(self, worker): + """ + Terminates the given worker. + """ + log.warning(_("Terminating worker process: %s") % worker) + + worker.terminate() + + def terminate_all_workers(self): + """ + Terminates all workers. + """ + for worker in self.workers: + self.terminate_worker(worker) + + # Wait until the worker has finished. + worker.join() + + def remove_worker(self, worker): + """ + Removes a worker from the internal list of worker processes. + """ + assert not worker.is_alive(), "Remove alive worker?" + + log.debug("Removing worker: %s" % worker) + try: + self.__workers.remove(worker) + except: + pass + + def cleanup_workers(self): + """ + Remove workers that are not alive any more. + """ + for worker in self.workers: + if worker.is_alive(): + continue + + self.remove_worker(worker) + + @property + def workers(self): + return self.__workers[:] + + @property + def running_workers(self): + workers = [] + + for worker in self.workers: + if worker.waiting.is_set(): + continue + + workers.append(worker) + + return workers + + @property + def waiting_workers(self): + workers = [] + + for worker in self.workers: + if worker.waiting.is_set(): + workers.append(worker) + + return workers + + # Signal handling. + + def register_signal_handlers(self): + signal.signal(signal.SIGCHLD, self.handle_SIGCHLD) + signal.signal(signal.SIGINT, self.handle_SIGTERM) + signal.signal(signal.SIGTERM, self.handle_SIGTERM) + + def handle_SIGCHLD(self, signum, frame): + """ + Handle signal SIGCHLD. + """ + # Spawn new workers if necessary. + self.spawn_workers_if_needed() + + def handle_SIGTERM(self, signum, frame): + """ + Handle signal SIGTERM. + """ + # Just shut down. + self.shutdown() + + # Talking to the hub. + + def send_builder_info(self): + log.info(_("Sending builder information to hub...")) + + data = { + # CPU info + "cpu_model" : system.cpu_model, + "cpu_count" : system.cpu_count, + "cpu_arch" : system.native_arch, + "cpu_bogomips" : system.cpu_bogomips, + + # Memory + swap + "mem_total" : system.memory, + "swap_total" : system.swap_total, + + # Pakfire + OS + "pakfire_version" : PAKFIRE_VERSION, + "host_key" : self.config.get("signatures", "host_key", None), + "os_name" : system.distro.pretty_name, + + # Supported arches + "supported_arches" : ",".join(system.supported_arches), + } + self.transport.post("/builders/info", data=data) + + def send_keepalive(self): + log.debug("Sending keepalive message to hub...") + + data = { + # Load average. + "loadavg1" : system.loadavg1, + "loadavg5" : system.loadavg5, + "loadavg15" : system.loadavg15, + + # Memory + "mem_total" : system.memory_total, + "mem_free" : system.memory_free, + + # Swap + "swap_total" : system.swap_total, + "swap_free" : system.swap_free, + + # Disk space + "space_free" : self.free_space, + } + self.transport.post("/builders/keepalive", data=data) + + @property + def free_space(self): + mp = system.get_mountpoint(BUILD_ROOT) + + return mp.space_left + + +class PakfireWorker(multiprocessing.Process): + def __init__(self, config, waiting=None): + multiprocessing.Process.__init__(self) + + # Save config. + self.config = config + + # Waiting event. Clear if this worker is running a build. + self.waiting = multiprocessing.Event() + self.waiting.set() + + # Indicates if this worker is running. + self.__running = True + + def run(self): + # Register signal handlers. + self.register_signal_handlers() + + # Create connection to the hub. + self.transport = transport.PakfireHubTransport(self.config) + self.transport.fork() + + while self.__running: + # Try to get a new build job. + job = self.get_new_build_job() + if not job: + continue + + # If we got a job, we are not waiting anymore. + self.waiting.clear() + + # Run the job and return. + return self.execute_job(job) + + def shutdown(self): + self.__running = False + + # When we are just waiting, we can edit right away. + if self.waiting.is_set(): + log.debug("Exiting immediately") + sys.exit(1) + + # XXX figure out what to do, when a build is running. + + # Signal handling. + + def register_signal_handlers(self): + signal.signal(signal.SIGCHLD, self.handle_SIGCHLD) + signal.signal(signal.SIGINT, self.handle_SIGTERM) + signal.signal(signal.SIGTERM, self.handle_SIGTERM) + + def handle_SIGCHLD(self, signum, frame): + """ + Handle signal SIGCHLD. + """ + # Must be here so that SIGCHLD won't be propagated to + # PakfireDaemon. + pass + + def handle_SIGTERM(self, signum, frame): + """ + Handle signal SIGTERM. + """ + self.shutdown() + + def get_new_build_job(self, timeout=600): + log.debug("Requesting new job...") + + try: + job = self.transport.get_json("/builders/jobs/queue", + data={ "timeout" : timeout, }, timeout=timeout) + + if job: + return BuildJob(job) + + # As this is a long poll request, it is okay to time out. + except TransportMaxTriesExceededError: + pass + + def execute_job(self, job): + log.debug("Executing job: %s" % job) + + # Call the function that processes the build and try to catch general + # exceptions and report them to the server. + # If everything goes okay, we tell this the server, too. + try: + # Create a temporary file and a directory for the resulting files. + tmpdir = tempfile.mkdtemp() + tmpfile = os.path.join(tmpdir, os.path.basename(job.source_url)) + logfile = os.path.join(tmpdir, "build.log") + + # Create pakfire configuration instance. + config = pakfire.config.ConfigDaemon() + config.parse(job.config) + + # Create pakfire instance. + p = None + try: + p = pakfire.base.PakfireBuilder(config=config, arch=job.arch) + + # Download the source package. + grabber = pakfire.downloader.PackageDownloader(p) + grabber.urlgrab(job.source_url, filename=tmpfile) + + # Check if the download checksum matches (if provided). + if job.source_hash_sha512: + h = hashlib.new("sha512") + f = open(tmpfile, "rb") + while True: + buf = f.read(BUFFER_SIZE) + if not buf: + break + + h.update(buf) + f.close() + + if not job.source_hash_sha512 == h.hexdigest(): + raise DownloadError, "Hash check did not succeed." + + # Create a new instance of a build environment. + build = pakfire.builder.BuildEnviron(p, tmpfile, + release_build=True, build_id=job.id, logfile=logfile) + + try: + # Create the build environment. + build.start() + + # Update the build status on the server. + self.upload_buildroot(job, build.installed_packages) + self.update_state(job, "running") + + # Run the build (without install test). + build.build(install_test=False) + + # Copy the created packages to the tempdir. + build.copy_result(tmpdir) + + finally: + # Cleanup the build environment. + build.stop() + + # Jippie, build is finished, we are going to upload the files. + self.update_state(job, "uploading") + + # Walk through the result directory and upload all (binary) files. + # Skip that for test builds. + if not job.type == "test": + for dir, subdirs, files in os.walk(tmpdir): + for file in files: + file = os.path.join(dir, file) + if file in (logfile, tmpfile,): + continue + + self.upload_file(job, file, "package") + + except DependencyError, e: + message = "%s: %s" % (e.__class__.__name__, e) + self.update_state(job, "dependency_error", message) + raise + + except DownloadError, e: + message = "%s: %s" % (e.__class__.__name__, e) + self.update_state(job, "download_error", message) + raise + + finally: + if p: + p.destroy() + + # Upload the logfile in any case and if it exists. + if os.path.exists(logfile): + self.upload_file(job, logfile, "log") + + # Cleanup the files we created. + pakfire.util.rm(tmpdir) + + except DependencyError: + # This has already been reported. + raise + + except (DownloadError,): + # Do not take any further action for these exceptions. + pass + + except (KeyboardInterrupt, SystemExit): + self.update_state(job, "aborted") + + except Exception, e: + # Format the exception and send it to the server. + message = "%s: %s" % (e.__class__.__name__, e) + + self.update_state(job, "failed", message) + raise + + else: + self.update_state(job, "finished") + + def update_state(self, job, state, message=None): + """ + Update state of the build job on the hub. + """ + data = { + "message" : message or "", + } + + self.transport.post("/builders/jobs/%s/state/%s" % (job.id, state), + data=data) + + def upload_file(self, job, filename, type): + assert os.path.exists(filename) + assert type in ("package", "log") + + # First upload the file data and save the upload_id. + upload_id = self.transport.upload_file(filename) + + data = { + "type" : type, + } + + # Add the file to the build. + self.transport.post("/builders/jobs/%s/addfile/%s" % (job.id, upload_id), + data=data) + + def upload_buildroot(self, job, installed_packages): + pkgs = [] + for pkg in installed_packages: + pkgs.append((pkg.friendly_name, pkg.uuid)) + + data = { "buildroot" : json.dumps(pkgs) } + + self.transport.post("/builders/jobs/%s/buildroot" % job.id, data=data) diff --git a/python/pakfire/distro.py b/python/pakfire/distro.py index d4cc6e7e..fae20bb3 100644 --- a/python/pakfire/distro.py +++ b/python/pakfire/distro.py @@ -25,7 +25,7 @@ import re import logging log = logging.getLogger("pakfire") -from system import system +import system class Distribution(object): def __init__(self, data=None): @@ -47,8 +47,9 @@ class Distribution(object): return keymap = { - "NAME" : "name", - "VERSION_ID" : "release", + "NAME" : "name", + "PRETTY_NAME" : "pretty_name", + "VERSION_ID" : "release", } data = {} @@ -109,6 +110,14 @@ class Distribution(object): def name(self): return self._data.get("name", "unknown") + @property + def pretty_name(self): + pretty_name = self._data.get("pretty_name", None) + if not pretty_name: + pretty_name = " ".join((self.name, self.release)) + + return pretty_name + @property def release(self): return self._data.get("release", "0") @@ -134,11 +143,11 @@ class Distribution(object): return self._data.get("contact", "N/A") def get_arch(self): - arch = self._data.get("arch", None) or system.arch + arch = self._data.get("arch", None) or system.system.arch # We can not set up a build environment for noarch. if arch == "noarch": - arch = system.arch + arch = system.system.arch return arch @@ -228,7 +237,7 @@ class Distribution(object): None to skip the setting of the personality in the build chroot. """ - if self.arch == system.native_arch: + if self.arch == system.system.native_arch: return None arch2personality = { diff --git a/python/pakfire/downloader.py b/python/pakfire/downloader.py index 2315fca0..d9808597 100644 --- a/python/pakfire/downloader.py +++ b/python/pakfire/downloader.py @@ -21,6 +21,7 @@ import json import os +import pycurl import random import logging @@ -28,6 +29,7 @@ log = logging.getLogger("pakfire") from config import _Config +import urlgrabber.grabber from urlgrabber.grabber import URLGrabber, URLGrabError from urlgrabber.mirror import MirrorGroup from urlgrabber.progress import TextMeter @@ -72,6 +74,17 @@ class PakfireGrabber(URLGrabber): URLGrabber.__init__(self, *args, **kwargs) + def fork(self): + """ + Reset Curl object after forking a process. + """ + # XXX this is a very ugly hack and fiddles around with the internals + # or urlgrabber. We should not touch these, but apparently nobody + # else uses multiple threads or processes to talk to their servers. + # So we simply replace Curl with a new instance without closing + # the old one. This should be fixed in urlgrabber and/or pycurl. + urlgrabber.grabber._curl_cache = pycurl.Curl() + def check_offline_mode(self): offline = self.config.get("downloader", "offline") if not offline: diff --git a/python/pakfire/errors.py b/python/pakfire/errors.py index 3ac7b3c1..87c64e33 100644 --- a/python/pakfire/errors.py +++ b/python/pakfire/errors.py @@ -98,6 +98,62 @@ class TransactionCheckError(Error): message = _("Transaction test was not successful") +class TransportError(Error): + pass + + +class TransportConnectionError(TransportError): + pass + + +class TransportConnectionDNSError(TransportConnectionError): + pass + + +class TransportConnectionProxyError(TransportConnectionError): + pass + + +class TransportConnectionReadError(TransportConnectionError): + pass + + +class TransportConnectionResetError(TransportConnectionError): + pass + + +class TransportConnectionTimeoutError(TransportConnectionError): + pass + + +class TransportConnectionWriteError(TransportConnectionError): + pass + + +class TransportSSLError(TransportConnectionError): + pass + + +class TransportSSLCertificateExpiredError(TransportSSLError): + pass + + +class TransportInternalServerError(TransportError): + pass + + +class TransportForbiddenError(TransportError): + pass + + +class TransportMaxTriesExceededError(TransportError): + pass + + +class TransportNotFoundError(TransportError): + pass + + class XMLRPCError(Error): message = _("Generic XMLRPC error.") diff --git a/python/pakfire/system.py b/python/pakfire/system.py index 6d54d5ff..9a5d546a 100644 --- a/python/pakfire/system.py +++ b/python/pakfire/system.py @@ -25,6 +25,8 @@ import multiprocessing import os import socket +import distro + from i18n import _ class System(object): @@ -42,6 +44,13 @@ class System(object): return hn + @property + def distro(self): + if not hasattr(self, "_distro"): + self._distro = distro.Distribution() + + return self._distro + @property def native_arch(self): """ @@ -98,24 +107,32 @@ class System(object): """ return multiprocessing.cpu_count() - @property - def cpu_model(self): - # Determine CPU model - cpuinfo = {} + def parse_cpuinfo(self): + ret = {} + with open("/proc/cpuinfo") as f: for line in f.readlines(): - # Break at an empty line, because all information after that - # is redundant. - if not line: + # Only parse the first block. + if line == "\n": break try: - key, value = line.split(":") + # Split the lines by colons. + a, b = line.split(":") + + # Strip whitespace. + a = a.strip() + b = b.strip() + + ret[a] = b except: - pass # Skip invalid lines + pass - key, value = key.strip(), value.strip() - cpuinfo[key] = value + return ret + + @property + def cpu_model(self): + cpuinfo = self.parse_cpuinfo() ret = None if self.arch.startswith("arm"): @@ -132,20 +149,85 @@ class System(object): return ret or _("Could not be determined") @property - def memory(self): - # Determine memory size - memory = 0 + def cpu_bogomips(self): + cpuinfo = self.parse_cpuinfo() + + bogomips = cpuinfo.get("bogomips", None) + try: + return float(bogomips) * self.cpu_count + except: + pass + + def get_loadavg(self): + return os.getloadavg() + + @property + def loadavg1(self): + return self.get_loadavg()[0] + + @property + def loadavg5(self): + return self.get_loadavg()[1] + + @property + def loadavg15(self): + return self.get_loadavg()[2] + + def has_overload(self): + """ + Checks, if the load average is not too high. + + On this is to be decided if a new job is taken. + """ + # If there are more than 2 processes in the process queue per CPU + # core we will assume that the system has heavy load and to not request + # a new job. + return self.loadavg5 >= self.cpu_count * 2 + + def parse_meminfo(self): + ret = {} + with open("/proc/meminfo") as f: - line = f.readline() + for line in f.readlines(): + try: + a, b, c = line.split() - try: - a, b, c = line.split() - except: - pass - else: - memory = int(b) * 1024 + a = a.strip() + a = a.replace(":", "") + b = int(b) + + ret[a] = b * 1024 + except: + pass + + return ret + + @property + def memory_total(self): + meminfo = self.parse_meminfo() + + return meminfo.get("MemTotal", None) + + # For compatibility + memory = memory_total + + @property + def memory_free(self): + meminfo = self.parse_meminfo() + + return meminfo.get("MemFree", None) + + @property + def swap_total(self): + meminfo = self.parse_meminfo() + + return meminfo.get("SwapTotal", None) + + @property + def swap_free(self): + meminfo = self.parse_meminfo() - return memory + return meminfo.get("SwapFree", None) def get_mountpoint(self, path): return Mountpoint(path) diff --git a/python/pakfire/transport.py b/python/pakfire/transport.py new file mode 100644 index 00000000..db760ece --- /dev/null +++ b/python/pakfire/transport.py @@ -0,0 +1,387 @@ +#!/usr/bin/python +############################################################################### +# # +# Pakfire - The IPFire package management system # +# Copyright (C) 2013 Pakfire development team # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +############################################################################### + +from __future__ import division + +import base64 +import hashlib +import json +import os +import time +import urlgrabber +import urllib +import urlparse + +import pakfire.downloader +import pakfire.util + +from pakfire.constants import * +from pakfire.i18n import _ + +import logging +log = logging.getLogger("pakfire.transport") + + +class PakfireHubTransportUploader(object): + """ + Handles the upload of a single file to the hub. + """ + + def __init__(self, transport, filename): + self.transport = transport + self.filename = filename + + def get_upload_id(self): + """ + Gets an upload from the pakfire hub. + """ + # Calculate the SHA1 sum of the file to upload. + h = hashlib.new("sha1") + with open(self.filename, "rb") as f: + while True: + buf = f.read(CHUNK_SIZE) + if not buf: + break + + h.update(buf) + + data = { + "filename" : os.path.basename(self.filename), + "filesize" : os.path.getsize(self.filename), + "hash" : h.hexdigest(), + } + + upload_id = self.transport.get("/uploads/create", data=data) + log.debug("Got upload id: %s" % upload_id) + + return upload_id + + def send_file(self, upload_id, progress_callback=None): + """ + Sends the file content to the server. + + The data is splitted into chunks, which are + sent one after an other. + """ + with open(self.filename, "rb") as f: + # Initial chunk size. + chunk_size = CHUNK_SIZE + + # Count the already transmitted bytes. + transferred = 0 + + while True: + chunk = f.read(chunk_size) + if not chunk: + break + + log.debug("Got chunk of %s bytes" % len(chunk)) + + # Save the time when we started to send this bit. + time_started = time.time() + + # Send the chunk to the server. + self.send_chunk(upload_id, chunk) + + # Save the duration.time after the chunk has been transmitted + # and adjust chunk size to send one chunk per second. + duration = time.time() - time_started + chunk_size = int(chunk_size / duration) + + # Never let chunk_size drop under CHUNK_SIZE: + if chunk_size < CHUNK_SIZE: + chunk_size = CHUNK_SIZE + + # Add up the send amount of data. + transferred += len(chunk) + if progress_callback: + progress_callback(transferred) + + def send_chunk(self, upload_id, data): + """ + Sends a piece of the file to the server. + """ + # Calculate checksum over the chunk data. + h = hashlib.new("sha512") + h.update(data) + chksum = h.hexdigest() + + # Encode data in base64. + data = base64.b64encode(data) + + # Send chunk data to the server. + self.transport.post("/uploads/%s/sendchunk" % upload_id, + data={ "chksum" : chksum, "data" : data }) + + def destroy_upload(self, upload_id): + """ + Destroys the upload on the server. + """ + self.transport.get("/uploads/%s/destroy" % upload_id) + + def finish_upload(self, upload_id): + """ + Signals to the server, that the upload has finished. + """ + self.transport.get("/uploads/%s/finished" % upload_id) + + def run(self): + upload_id = None + + # Create a progress bar. + progress = pakfire.util.make_progress( + os.path.basename(self.filename), os.path.getsize(self.filename), speed=True, eta=True, + ) + + try: + # Get an upload ID. + upload_id = self.get_upload_id() + + # Send the file content. + self.send_file(upload_id, progress_callback=progress.update) + + except: + progress.finish() + + # Remove broken upload from server. + if upload_id: + self.destroy_upload(upload_id) + + # XXX catch fatal errors + raise + + else: + progress.finish() + + # If no exception was raised, the upload + # has finished. + self.finish_upload(upload_id) + + # Return the upload id so some code can actually do something + # with the file on the server. + return upload_id + + +class PakfireHubTransport(object): + """ + Connection to the pakfire hub. + """ + + def __init__(self, config): + self.config = config + + # Create connection to the hub. + self.grabber = pakfire.downloader.PakfireGrabber( + self.config, prefix=self.url, + ) + + def fork(self): + return self.grabber.fork() + + @property + def url(self): + """ + Construct a right URL out of the given + server, username and password. + + Basicly this just adds the credentials + to the URL. + """ + # Get credentials. + server, username, password = self.config.get_hub_credentials() + + # Parse the given URL. + url = urlparse.urlparse(server) + assert url.scheme in ("http", "https") + + # Build new URL. + ret = "%s://" % url.scheme + + # Add credentials if provided. + if username and password: + ret += "%s:%s@" % (username, password) + + # Add path components. + ret += url.netloc + + return ret + + def one_request(self, url, **kwargs): + try: + return self.grabber.urlread(url, **kwargs) + + except urlgrabber.grabber.URLGrabError, e: + # Timeout + if e.errno == 12: + raise TransportConnectionTimeoutError, e + + # Handle common HTTP errors + elif e.errno == 14: + # Connection errors + if e.code == 5: + raise TransportConnectionProxyError, url + elif e.code == 6: + raise TransportConnectionDNSError, url + elif e.code == 7: + raise TransportConnectionResetError, url + elif e.code == 23: + raise TransportConnectionWriteError, url + elif e.code == 26: + raise TransportConnectionReadError, url + + # SSL errors + elif e.code == 52: + raise TransportSSLCertificateExpiredError, url + + # HTTP error codes + elif e.code == 403: + raise TransportForbiddenError, url + elif e.code == 404: + raise TransportNotFoundError, url + elif e.code == 500: + raise TransportInternalServerError, url + + # All other exceptions... + raise + + def request(self, url, tries=None, **kwargs): + # tries = None implies wait infinitely + + while tries or tries is None: + if tries: + tries -= 1 + + try: + return self.one_request(url, **kwargs) + + # 500 - Internal Server Error + except TransportInternalServerError, e: + log.exception("%s" % e.__class__.__name__) + + # Wait a minute before trying again. + time.sleep(60) + + # Retry on connection problems. + except TransportConnectionError, e: + log.exception("%s" % e.__class__.__name__) + + # Wait for 10 seconds. + time.sleep(10) + + raise TransportMaxTriesExceededError + + def escape_args(self, **kwargs): + return urllib.urlencode(kwargs) + + def get(self, url, data={}, **kwargs): + """ + Sends a HTTP GET request to the given URL. + + All given keyword arguments are considered as form data. + """ + params = self.escape_args(**data) + + if params: + url = "%s?%s" % (url, params) + + return self.request(url, **kwargs) + + def post(self, url, data={}, **kwargs): + """ + Sends a HTTP POST request to the given URL. + + All keyword arguments are considered as form data. + """ + params = self.escape_args(**data) + if params: + kwargs.update({ + "data" : params, + }) + + return self.request(url, **kwargs) + + def upload_file(self, filename): + """ + Uploads the given file to the server. + """ + uploader = PakfireHubTransportUploader(self, filename) + upload_id = uploader.run() + + return upload_id + + def get_json(self, *args, **kwargs): + res = self.get(*args, **kwargs) + + # Decode JSON. + if res: + return json.loads(res) + + ### Misc. actions + + def noop(self): + """ + No operation. Just to check if the connection is + working. Returns a random number. + """ + return self.get("/noop") + + def test_code(self, error_code): + assert error_code >= 100 and error_code <= 999 + + self.get("/error/test/%s" % error_code) + + # Build actions + + def build_create(self, filename, build_type, arches=None, distro=None): + """ + Create a new build on the hub. + """ + assert build_type in ("scratch", "release") + + # XXX Check for permission to actually create a build. + + # Upload the source file to the server. + upload_id = self.upload_file(filename) + + data = { + "arches" : ",".join(arches or []), + "build_type" : build_type, + "distro" : distro or "", + "upload_id" : upload_id, + } + + # Then create the build. + build_id = self.get("/builds/create", data=data) + + return build_id or None + + def build_get(self, build_uuid): + return self.get_json("/builds/%s" % build_uuid) + + # Job actions + + def job_get(self, job_uuid): + return self.get_json("/jobs/%s" % job_uuid) + + # Package actions + + def package_get(self, package_uuid): + return self.get_json("/packages/%s" % package_uuid) -- 2.39.2