]> git.ipfire.org Git - pakfire.git/commitdiff
buildservice: Move the old stuff into the new wrapper
authorMichael Tremer <michael.tremer@ipfire.org>
Fri, 27 Oct 2023 17:17:24 +0000 (17:17 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Fri, 27 Oct 2023 17:17:24 +0000 (17:17 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
Makefile.am
configure.ac
src/pakfire/buildservice.py
src/pakfire/daemon.py
src/pakfire/hub.py [deleted file]

index 30517deb55ec3a2817252c08371374f01c97653c..ef94a43a4dac8a07dc7454f6a197bed07c8768df 100644 (file)
@@ -116,6 +116,7 @@ CLEANFILES += \
 pakfire_PYTHON = \
        src/pakfire/__init__.py \
        src/pakfire/__version__.py \
+       src/pakfire/buildservice.py \
        src/pakfire/config.py \
        src/pakfire/constants.py \
        src/pakfire/daemon.py \
index b386fa2c9ebbf18a5e9949926978a2269e692c0b..f417f8af2a7db6cabeff3e5f82b6534de9dcc0d3 100644 (file)
@@ -276,7 +276,6 @@ AM_PATH_PYTHON([3.6])
 
 AX_PYTHON_MODULE([cpuinfo], [fatal])
 AX_PYTHON_MODULE([kerberos], [fatal])
-AX_PYTHON_MODULE([progressbar2], [fatal])
 AX_PYTHON_MODULE([psutil], [fatal])
 AX_PYTHON_MODULE([setproctitle], [fatal])
 AX_PYTHON_MODULE([systemd], [fatal])
index 961bb37e7ece60e359231d3a9903de8a3c710680..5df32f9c70f82b1535e0a160290fb32674445f60 100644 (file)
 #                                                                             #
 ###############################################################################
 
-import _pakfire
+import asyncio
+import cpuinfo
+import json
+import kerberos
+import logging
+import os
+import psutil
+import subprocess
+import tempfile
+import tornado.httpclient
+import tornado.simple_httpclient
+import tornado.websocket
+import urllib.parse
+
+from .__version__ import PAKFIRE_VERSION
+from . import _pakfire
+from . import util
+
+# Configure some useful defaults for all requests
+tornado.httpclient.AsyncHTTPClient.configure(
+       None, defaults = {
+               "user_agent" : "pakfire/%s" % PAKFIRE_VERSION,
+       },
+)
+
+# Setup logging
+log = logging.getLogger("pakfire.buildservice")
+
+class AuthError(Exception):
+       """
+               Raised when the client could not authenticate against the build service
+       """
+       pass
+
+
+class TransportError(Exception):
+       pass
+
+
+class TemporaryConnectionError(TransportError):
+       """
+               Raised when there is a temporary connection issue and
+               the request should be tried again.
+       """
+       pass
+
 
 class BuildService(_pakfire.BuildService):
        """
                This wraps the parts of the build service
                that has been implemented in libpakfire.
        """
-       pass
+       def __init__(self):
+               super().__init__()
+
+               # Initialise the HTTP client
+               self.client = tornado.httpclient.AsyncHTTPClient()
+
+       async def _socket(self, path, **kwargs):
+               return await self._request("GET", path,
+
+                       # Enable websocket and ping once every ten seconds
+                       websocket=True,
+                       websocket_ping_interval=10,
+                       websocket_ping_timeout=60,
+
+                       **kwargs,
+               )
+
+       async def _request(self, method, path, websocket=False, websocket_ping_interval=None,
+                       websocket_ping_timeout=None, authenticate=True,
+                       body=None, body_producer=None, on_message_callback=None, **kwargs):
+               headers = {}
+               query_args = {}
+
+               # Make absolute URL
+               url = urllib.parse.urljoin(self.url, path)
+
+               # Change scheme for websocket
+               if websocket and url.startswith("https://"):
+                       url = url.replace("https://", "wss://")
+
+               # Filter all query arguments
+               for arg in kwargs:
+                       # Skip anything that is None
+                       if kwargs[arg] is None:
+                               continue
+
+                       # Add to query arguments
+                       query_args[arg] = kwargs[arg]
+
+               # Encode query arguments
+               query_args = urllib.parse.urlencode(query_args, doseq=True)
+
+               # Add query arguments
+               if method in ("GET", "PUT", "DELETE"):
+                       url = "%s?%s" % (url, query_args)
+
+               # Add any arguments to the body
+               elif method == "POST":
+                       if body is None:
+                               body = query_args
+
+               # Perform Kerberos authentication
+               if authenticate:
+                       krb5_context = self._setup_krb5_context(url)
+
+                       # Fetch the Kerberos client response
+                       krb5_client_response = kerberos.authGSSClientResponse(krb5_context)
+
+                       # Set the Negotiate header
+                       headers |= {
+                               "Authorization" : "Negotiate %s" % krb5_client_response,
+                       }
+
+               # Make the request
+               req = tornado.httpclient.HTTPRequest(
+                       method=method, url=url, headers=headers, body=body,
+
+                       # Give the server more time to respond
+                       request_timeout=60,
+
+                       # Add all the rest
+                       body_producer=body_producer,
+               )
+
+               # Is this a web socket request?
+               if websocket:
+                       return await tornado.websocket.websocket_connect(
+                               req,
+                               ping_interval=websocket_ping_interval,
+                               ping_timeout=websocket_ping_timeout,
+                               on_message_callback=on_message_callback,
+                       )
+
+               # Send the request and wait for a response
+               try:
+                       res = await self.client.fetch(req)
+
+               # Catch any HTTP errors
+               except tornado.httpclient.HTTPError as e:
+                       if e.code in (502, 503):
+                               raise TemporaryConnectionError from e
+
+                       # Re-raise anything else
+                       raise e
+
+               # Perform mutual authentication
+               if authenticate:
+                       for header in res.headers.get_list("WWW-Authenticate"):
+                               # Skip anything that isn't a Negotiate header
+                               if not header.startswith("Negotiate "):
+                                       continue
+
+                               # Fetch the server response
+                               krb5_server_response = header.removeprefix("Negotiate ")
+
+                               # Validate the server response
+                               result = kerberos.authGSSClientStep(krb5_context, krb5_server_response)
+                               if not result == kerberos.AUTH_GSS_COMPLETE:
+                                       raise AuthError("Could not verify the Kerberos server response")
+
+                               log.debug("Kerberos Server Response validating succeeded")
+
+                               # Call this so that we won't end in the else block
+                               break
+
+                       # If there were no headers
+                       else:
+                               raise AuthError("Mutual authentication failed")
+
+               # Decode JSON response
+               if res.body:
+                       return json.loads(res.body)
+
+               # Empty response
+               return {}
+
+       async def _proxy(self, cls, *args, **kwargs):
+               conn = cls(self, *args, **kwargs)
+
+               # Create the initial connection
+               await conn.reconnect()
+
+               return conn
+
+       def _setup_krb5_context(self, url):
+               """
+                       Creates the Kerberos context that can be used to perform client
+                       authentication against the server, and mutual authentication for the server.
+               """
+               # Parse the input URL
+               url = urllib.parse.urlparse(url)
+
+               # Create a new client context
+               result, krb5_context = kerberos.authGSSClientInit("HTTP@%s" % url.hostname)
+
+               if not result == kerberos.AUTH_GSS_COMPLETE:
+                       raise AuthError("Could not create Kerberos Client context")
+
+               # Next step...
+               try:
+                       result = kerberos.authGSSClientStep(krb5_context, "")
+
+               except kerberos.GSSError as e:
+                       log.error("Kerberos authentication failed: %s" % e)
+
+                       raise AuthError("%s" % e) from e
+
+               if not result == kerberos.AUTH_GSS_CONTINUE:
+                       raise AuthError("Cloud not continue Kerberos authentication")
+
+               return krb5_context
+
+       # Builder
+
+       async def control(self, *args, **kwargs):
+               """
+                       Creates a control connection
+               """
+               return await self._proxy(ControlConnection, *args, **kwargs)
+
+       async def job(self, *args, **kwargs):
+               """
+                       Creates a control connection for a certain job
+               """
+               return await self._proxy(JobControlConnection, *args, **kwargs)
+
+
+class Connection(object):
+       def __init__(self, service, *args, **kwargs):
+               self.service = service
+
+               # The active connection
+               self.conn = None
+
+               # Callbacks
+               self.callbacks = {}
+
+               # Perform custom initialization
+               self.init(*args, **kwargs)
+
+       def init(self, *args, **kwargs):
+               pass
+
+       @property
+       def url(self):
+               raise NotImplementedError
+
+       async def connect(self):
+               """
+                       This will create a connection
+               """
+               return await self.service._socket(self.url,
+                       on_message_callback=self.on_message_callback)
+
+       async def reconnect(self):
+               """
+                       Tries to reconnect for forever
+               """
+               attempts = 0
+
+               while True:
+                       attempts += 1
+
+                       log.debug("Trying to reconnect (attempt %s)..." % attempts)
+
+                       try:
+                               self.conn = await self.connect()
+
+                       # The web service responded with some error
+                       except tornado.httpclient.HTTPClientError as e:
+                               log.error("%s: Received HTTP Error %s" % (self.url, e.code))
+
+                               # 502 - Proxy Error
+                               # 503 - Service Unavailable
+                               if e.code in (502, 503):
+                                       await asyncio.sleep(10)
+
+                               # Raise any unhandled errors
+                               else:
+                                       raise e
+
+                       # The web service did not respond in time
+                       except tornado.simple_httpclient.HTTPTimeoutError as e:
+                               await asyncio.sleep(30)
+
+                       # Raise all other exceptions
+                       except Exception as e:
+                               raise e
+
+                       # If the connection was established successfully, we return
+                       else:
+                               return
+
+       def close(self):
+               """
+                       Closes the connection
+               """
+               if self.conn:
+                       self.conn.close()
+
+       def on_message_callback(self, message):
+               # Fail if no callbacks have been set
+               if not self.callbacks:
+                       raise NotImplementedError
+
+               # Decode the message
+               message = self._decode_json_message(message)
+
+               # Ignore empty messages
+               if message is None:
+                       return
+
+               # Log the received message
+               log.debug("Received message:\n%s" % json.dumps(message, indent=4))
+
+               # Fetch the message type & data
+               type = message.get("type")
+               data = message.get("data")
+
+               # Find a suitable callback
+               try:
+                       callback = self.callbacks[type]
+
+               # Log an error for unknown messages and ignore them
+               except KeyError:
+                       log.error("Received message of unknown type '%s'" % type)
+                       return
+
+               # Call the callback
+               callback(data)
+
+       @staticmethod
+       def _decode_json_message(message):
+               """
+                       Takes a received message and decodes it
+               """
+               # Ignore empty messages
+               if message is None:
+                       return
+
+               try:
+                       message = json.loads(message)
+               except json.JSONDecodeError:
+                       log.error("Could not decode JSON message:\n%s" % message)
+                       return
+
+               return message
+
+       async def write_message(self, message, **kwargs):
+               """
+                       Sends a message but encodes it into JSON first
+               """
+               # This should never happen
+               if not self.conn:
+                       raise RuntimeError("Not connected")
+
+               if isinstance(message, dict):
+                       message = tornado.escape.json_encode(message)
+
+               try:
+                       return await self.conn.write_message(message, **kwargs)
+
+               except tornado.websocket.WebSocketClosedError as e:
+                       # Try to reconnect
+                       await self.reconnect()
+
+                       # Try to send the message again
+                       return await self.write_message(message, **kwargs)
+
+
+class ControlConnection(Connection):
+       url = "/api/v1/builders/control"
+
+       def init(self, daemon):
+               self.daemon = daemon
+
+               # Callbacks
+               self.callbacks = {
+                       "job" : self.daemon.job_received,
+               }
+
+               # Fetch processor information
+               self.cpu = cpuinfo.get_cpu_info()
+
+               # Fetch the native architecture
+               self.native_arch = _pakfire.native_arch()
+
+       async def submit_stats(self):
+               """
+                       Sends stats about this builder
+               """
+               log.debug("Sending stats...")
+
+               # Fetch processor information
+               cpu_times = psutil.cpu_times_percent()
+
+               # Fetch memory/swap information
+               mem  = psutil.virtual_memory()
+               swap = psutil.swap_memory()
+
+               # Fetch load average
+               loadavg = psutil.getloadavg()
+
+               await self.write_message({
+                       "type" : "stats",
+                       "data" : {
+                               # CPU info
+                               "cpu_model"       : self.cpu.get("brand"),
+                               "cpu_count"       : self.cpu.get("count"),
+                               "cpu_arch"        : self.native_arch,
+
+                               # Pakfire + OS
+                               "pakfire_version" : PAKFIRE_VERSION,
+                               "os_name"         : util.get_distro_name(),
+
+                               # CPU Times
+                               "cpu_user"       : cpu_times.user,
+                               "cpu_nice"       : cpu_times.nice,
+                               "cpu_system"     : cpu_times.system,
+                               "cpu_idle"       : cpu_times.idle,
+                               "cpu_iowait"     : cpu_times.iowait,
+                               "cpu_irq"        : cpu_times.irq,
+                               "cpu_softirq"    : cpu_times.softirq,
+                               "cpu_steal"      : cpu_times.steal,
+                               "cpu_guest"      : cpu_times.guest,
+                               "cpu_guest_nice" : cpu_times.guest_nice,
+
+                               # Load average
+                               "loadavg1"       : loadavg[0],
+                               "loadavg5"       : loadavg[1],
+                               "loadavg15"      : loadavg[2],
+
+                               # Memory
+                               "mem_total"      : mem.total,
+                               "mem_available"  : mem.available,
+                               "mem_used"       : mem.used,
+                               "mem_free"       : mem.free,
+                               "mem_active"     : mem.active,
+                               "mem_inactive"   : mem.inactive,
+                               "mem_buffers"    : mem.buffers,
+                               "mem_cached"     : mem.cached,
+                               "mem_shared"     : mem.shared,
+
+                               # Swap
+                               "swap_total"     : swap.total,
+                               "swap_used"      : swap.used,
+                               "swap_free"      : swap.free,
+                       },
+               })
+
+
+class JobControlConnection(Connection):
+       """
+               Proxy for Build Jobs
+       """
+       def init(self, id, worker):
+               self.id = id
+
+               # Callbacks
+               self.callbacks = {
+                       "abort" : worker.abort,
+               }
+
+       @property
+       def url(self):
+               return "/api/v1/jobs/%s" % self.id
+
+       async def finished(self, success, packages=None, logfile=None):
+               """
+                       Will tell the hub that a job has finished
+               """
+               # Upload the log file
+               if logfile:
+                       logfile = await self.service.upload(logfile, filename="%s.log" % self.id)
+
+               # Upload the packages
+               if packages:
+                       for package in packages:
+                               await self.service.upload(package)
+
+               while True:
+                       try:
+                               # Send the request
+                               response = await self.service._request("POST", "/api/v1/jobs/%s/finished" % self.id,
+                                       success="1" if success else "0", logfile=logfile, packages=packages,
+                               )
+
+                       # Try again after a short moment on connection errors
+                       except TemporaryConnectionError as e:
+                               await asyncio.sleep(5)
+
+                       else:
+                               break
+
+               # Handle the response
+               # XXX TODO
+
+       async def log(self, timestamp, level, message):
+               """
+                       Sends a log message to the hub
+               """
+               await self.write_message({
+                       "type" : "log",
+                       "data" : {
+                               "timestamp" : timestamp,
+                               "level"     : level,
+                               "message"   : message,
+                       },
+               })
index d32bf89b3833f6ecfce5cc687392298bd932e576..f1e25db389f7415650fc4d608d0da42d8209badb 100644 (file)
@@ -17,7 +17,6 @@ import tempfile
 from . import _pakfire
 from . import buildservice
 from . import config
-from . import hub
 from . import logger
 
 from pakfire.constants import *
@@ -32,9 +31,6 @@ class Daemon(object):
                self.debug   = debug
                self.verbose = verbose
 
-               # Initialize the connection to the buildservice
-               self.buildservice = buildservice.BuildService()
-
                # Setup logger
                self.log = logger.setup(
                        "pakfire",
@@ -43,9 +39,8 @@ class Daemon(object):
                        debug=self.debug,
                )
 
-               # Connect to the Pakfire Hub
-               self.hub = self.connect_to_hub()
-               self.control = None
+               # Initialize the connection to the buildservice
+               self.service = buildservice.BuildService()
 
                # Set when this process receives a shutdown signal
                self._shutdown_signalled = None
@@ -63,15 +58,6 @@ class Daemon(object):
                """
                return self.config.get("daemon", "ccache_path", "/var/cache/pakfire/ccache")
 
-       def connect_to_hub(self):
-               url = self.config.get("daemon", "server", PAKFIRE_HUB)
-
-               # Host Credentials
-               keytab = self.config.get("daemon", "keytab", None)
-
-               # Create connection to the hub
-               return hub.Hub(url, keytab=keytab)
-
        async def run(self):
                """
                        Main loop.
@@ -83,7 +69,7 @@ class Daemon(object):
                self._shutdown_signalled = asyncio.Event()
 
                # Create the control connection
-               self.control = await self.hub.control(daemon=self)
+               self.control = await self.service.control(daemon=self)
 
                # Run main loop
                while True:
@@ -196,12 +182,17 @@ class Worker(multiprocessing.Process):
                multiprocessing.Process.__init__(self)
                self.daemon = daemon
 
-               self.hub = self.daemon.hub
-               self.log = self.daemon.log
-
                # The job that has been received
                self.data = data
 
+       @property
+       def service(self):
+               return self.daemon.service
+
+       @property
+       def log(self):
+               return self.daemon.log
+
        def run(self):
                self.log.debug("Worker %s has launched" % self.pid)
 
@@ -265,8 +256,8 @@ class Worker(multiprocessing.Process):
                if not pkg:
                        raise ValueError("Did not received a package URL")
 
-               # Connect to the hub
-               self.job = await self.hub.job(self.job_id, worker=self)
+               # Connect to the service
+               self.job = await self.service.job(self.job_id, worker=self)
 
                # Setup build logger
                logger = BuildLogger(self.log, self.job)
@@ -307,7 +298,7 @@ class Worker(multiprocessing.Process):
                                if not self.is_test():
                                        packages = glob.glob("%s/*.pfm" % target)
 
-                       # Notify the hub that the job has finished
+                       # Notify the service that the job has finished
                        finally:
                                await self.job.finished(
                                        success=success,
@@ -430,5 +421,5 @@ class BuildLogger(object):
                        if message is None:
                                continue
 
-                       # Send message to the hub
+                       # Send message to the service
                        await self.job.log(message.created, message.levelno, message.getMessage())
diff --git a/src/pakfire/hub.py b/src/pakfire/hub.py
deleted file mode 100644 (file)
index 6d81967..0000000
+++ /dev/null
@@ -1,687 +0,0 @@
-#!/usr/bin/python3
-###############################################################################
-#                                                                             #
-# Pakfire - The IPFire package management system                              #
-# Copyright (C) 2013 Pakfire development team                                 #
-#                                                                             #
-# This program is free software: you can redistribute it and/or modify        #
-# it under the terms of the GNU General Public License as published by        #
-# the Free Software Foundation, either version 3 of the License, or           #
-# (at your option) any later version.                                         #
-#                                                                             #
-# This program is distributed in the hope that it will be useful,             #
-# but WITHOUT ANY WARRANTY; without even the implied warranty of              #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
-# GNU General Public License for more details.                                #
-#                                                                             #
-# You should have received a copy of the GNU General Public License           #
-# along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
-#                                                                             #
-###############################################################################
-
-import asyncio
-import cpuinfo
-import functools
-import hashlib
-import json
-import kerberos
-import logging
-import os.path
-import progressbar2 as progressbar
-import psutil
-import subprocess
-import tempfile
-import tornado.escape
-import tornado.httpclient
-import tornado.simple_httpclient
-import tornado.websocket
-import urllib.parse
-
-from . import _pakfire
-from . import util
-from .constants import *
-from .i18n import _
-
-# Setup logging
-log = logging.getLogger("pakfire.hub")
-
-# Configure some useful defaults for all requests
-tornado.httpclient.AsyncHTTPClient.configure(
-       None, defaults = {
-               "user_agent" : "pakfire/%s" % PAKFIRE_VERSION,
-       },
-)
-
-class AuthError(Exception):
-       """
-               Raised when the client could not authenticate against the hub
-       """
-       pass
-
-
-class TransportError(Exception):
-       pass
-
-
-class TemporaryConnectionError(TransportError):
-       """
-               Raised when there is a temporary connection issue and
-               the request should be tried again.
-       """
-       pass
-
-
-class Hub(object):
-       def __init__(self, url, keytab=None):
-               self.url = url
-
-               # Store path to keytab
-               self.keytab = keytab
-
-               # Initialise the HTTP client
-               self.client = tornado.httpclient.AsyncHTTPClient()
-
-               # XXX support proxies
-
-               # Fetch a TGT with the given keytab
-               if self.keytab:
-                       self._setup_credentials_cache()
-
-                       self._fetch_kerberos_ticket()
-
-       def _setup_credentials_cache(self):
-               """
-                       Create a temporary file to be used as Kerberos credentials cache
-               """
-               self.credentials_cache = tempfile.NamedTemporaryFile()
-
-               os.environ["KRB5CCNAME"] = self.credentials_cache.name
-
-       def _fetch_kerberos_ticket(self):
-               command = ["kinit", "-k", "-t", self.keytab]
-
-               p = subprocess.run(command, check=True, capture_output=True, text=True)
-
-       async def _socket(self, path, **kwargs):
-               return await self._request("GET", path,
-
-                       # Enable websocket and ping once every ten seconds
-                       websocket=True,
-                       websocket_ping_interval=10,
-                       websocket_ping_timeout=60,
-
-                       **kwargs,
-               )
-
-       async def _request(self, method, path, websocket=False, websocket_ping_interval=None,
-                       websocket_ping_timeout=None, authenticate=True,
-                       body=None, body_producer=None, on_message_callback=None, **kwargs):
-               headers = {}
-               query_args = {}
-
-               # Make absolute URL
-               url = urllib.parse.urljoin(self.url, path)
-
-               # Change scheme for websocket
-               if websocket and url.startswith("https://"):
-                       url = url.replace("https://", "wss://")
-
-               # Filter all query arguments
-               for arg in kwargs:
-                       # Skip anything that is None
-                       if kwargs[arg] is None:
-                               continue
-
-                       # Add to query arguments
-                       query_args[arg] = kwargs[arg]
-
-               # Encode query arguments
-               query_args = urllib.parse.urlencode(query_args, doseq=True)
-
-               # Add query arguments
-               if method in ("GET", "PUT", "DELETE"):
-                       url = "%s?%s" % (url, query_args)
-
-               # Add any arguments to the body
-               elif method == "POST":
-                       if body is None:
-                               body = query_args
-
-               # Perform Kerberos authentication
-               if authenticate:
-                       krb5_context = self._setup_krb5_context(url)
-
-                       # Fetch the Kerberos client response
-                       krb5_client_response = kerberos.authGSSClientResponse(krb5_context)
-
-                       # Set the Negotiate header
-                       headers |= {
-                               "Authorization" : "Negotiate %s" % krb5_client_response,
-                       }
-
-               # Make the request
-               req = tornado.httpclient.HTTPRequest(
-                       method=method, url=url, headers=headers, body=body,
-
-                       # Give the server more time to respond
-                       request_timeout=60,
-
-                       # Add all the rest
-                       body_producer=body_producer,
-               )
-
-               # Is this a web socket request?
-               if websocket:
-                       return await tornado.websocket.websocket_connect(
-                               req,
-                               ping_interval=websocket_ping_interval,
-                               ping_timeout=websocket_ping_timeout,
-                               on_message_callback=on_message_callback,
-                       )
-
-               # Send the request and wait for a response
-               try:
-                       res = await self.client.fetch(req)
-
-               # Catch any HTTP errors
-               except tornado.httpclient.HTTPError as e:
-                       if e.code in (502, 503):
-                               raise TemporaryConnectionError from e
-
-                       # Re-raise anything else
-                       raise e
-
-               # Perform mutual authentication
-               if authenticate:
-                       for header in res.headers.get_list("WWW-Authenticate"):
-                               # Skip anything that isn't a Negotiate header
-                               if not header.startswith("Negotiate "):
-                                       continue
-
-                               # Fetch the server response
-                               krb5_server_response = header.removeprefix("Negotiate ")
-
-                               # Validate the server response
-                               result = kerberos.authGSSClientStep(krb5_context, krb5_server_response)
-                               if not result == kerberos.AUTH_GSS_COMPLETE:
-                                       raise AuthError("Could not verify the Kerberos server response")
-
-                               log.debug("Kerberos Server Response validating succeeded")
-
-                               # Call this so that we won't end in the else block
-                               break
-
-                       # If there were no headers
-                       else:
-                               raise AuthError("Mutual authentication failed")
-
-               # Decode JSON response
-               if res.body:
-                       return json.loads(res.body)
-
-               # Empty response
-               return {}
-
-       async def _proxy(self, cls, *args, **kwargs):
-               conn = cls(self, *args, **kwargs)
-
-               # Create the initial connection
-               await conn.reconnect()
-
-               return conn
-
-       def _setup_krb5_context(self, url):
-               """
-                       Creates the Kerberos context that can be used to perform client
-                       authentication against the server, and mutual authentication for the server.
-               """
-               # Parse the input URL
-               url = urllib.parse.urlparse(url)
-
-               # Create a new client context
-               result, krb5_context = kerberos.authGSSClientInit("HTTP@%s" % url.hostname)
-
-               if not result == kerberos.AUTH_GSS_COMPLETE:
-                       raise AuthError("Could not create Kerberos Client context")
-
-               # Next step...
-               try:
-                       result = kerberos.authGSSClientStep(krb5_context, "")
-
-               except kerberos.GSSError as e:
-                       log.error("Kerberos authentication failed: %s" % e)
-
-                       raise AuthError("%s" % e) from e
-
-               if not result == kerberos.AUTH_GSS_CONTINUE:
-                       raise AuthError("Cloud not continue Kerberos authentication")
-
-               return krb5_context
-
-       # Uploads
-
-       async def upload(self, path, filename=None, show_progress=True):
-               """
-                       Uploads the file to the hub returning the upload ID
-               """
-               log.debug("Uploading %s..." % path)
-
-               # Use the basename of the file if no name was given
-               if filename is None:
-                       filename = os.path.basename(path)
-
-               # Determine the filesize
-               size = os.path.getsize(path)
-
-               # Make progressbar
-               if show_progress:
-                       p = progressbar.ProgressBar(
-                               max_value=size,
-                               widgets=[
-                                       progressbar.FormatCustomText(_("Uploading %s") % filename),
-                                       progressbar.Percentage(),
-                                       progressbar.Bar(),
-                                       progressbar.FileTransferSpeed(),
-                                       progressbar.DataSize(),
-                                       progressbar.AdaptiveETA(),
-                               ],
-                       )
-               else:
-                       p = None
-
-               # Compute a digest
-               digest = self._compute_digest("blake2b", path)
-
-               while True:
-                       # Prepare the file for streaming
-                       body_producer = functools.partial(self._stream_file, path, size, p)
-
-                       # Perform upload
-                       try:
-                               response = await self._request("PUT", "/api/v1/uploads",
-                                       body_producer=body_producer,
-                                       filename=filename, size=size, digest=digest
-                               )
-
-                       # On temporary issues, try again after a few seconds
-                       except TemporaryConnectionError as e:
-                               await asyncio.sleep(5)
-
-                       else:
-                               break
-
-               # Return the upload ID
-               return response.get("id")
-
-       async def delete_upload(self, upload_id):
-               await self._request("DELETE", "/api/v1/uploads/%s" % upload_id)
-
-       async def upload_multi(self, *paths, show_progress=True):
-               """
-                       Upload multiple files
-
-                       If one file could not be uploaded, any other uploads will be deleted
-               """
-               uploads = []
-
-               # Upload everything
-               try:
-                       for path in paths:
-                               upload = await self.upload(path, show_progress=show_progress)
-
-                               # Store the upload ID
-                               uploads.append(upload)
-
-               except Exception as e:
-                       # Remove any previous uploads
-                       await asyncio.gather(
-                               *(self.delete_upload(upload) for upload in uploads),
-                       )
-
-                       # Raise the exception
-                       raise e
-
-               # Return the IDs of the uploads
-               return uploads
-
-       @staticmethod
-       def _stream_file(path, size, p, write):
-               try:
-                       with open(path, "rb") as f:
-                               while True:
-                                       buf = f.read(64 * 1024)
-                                       if not buf:
-                                               break
-
-                                       # Update progressbar
-                                       if p:
-                                               l = len(buf)
-                                               p.increment(l)
-
-                                       write(buf)
-               finally:
-                       # Finish the progressbar
-                       if p:
-                               p.finish()
-
-       @staticmethod
-       def _compute_digest(algo, path):
-               h = hashlib.new(algo)
-
-               with open(path, "rb") as f:
-                       while True:
-                               buf = f.read(64 * 1024)
-                               if not buf:
-                                       break
-
-                               h.update(buf)
-
-               return "%s:%s" % (algo, h.hexdigest())
-
-       @staticmethod
-       def _decode_json_message(message):
-               """
-                       Takes a received message and decodes it.
-
-                       It will then call the callback with the decoded message.
-               """
-               # Ignore empty messages
-               if message is None:
-                       return
-
-               try:
-                       message = json.loads(message)
-               except json.JSONDecodeError:
-                       log.error("Could not decode JSON message:\n%s" % message)
-                       return
-
-               return message
-
-       # Builder
-
-       async def control(self, *args, **kwargs):
-               """
-                       Creates a control connection
-               """
-               return await self._proxy(ControlConnection, *args, **kwargs)
-
-       async def job(self, *args, **kwargs):
-               """
-                       Creates a control connection for a certain job
-               """
-               return await self._proxy(JobControlConnection, *args, **kwargs)
-
-
-class HubObject(object):
-       # Disable Nagle's algorithm?
-       nodelay = False
-
-       def __init__(self, hub, *args, **kwargs):
-               self.hub = hub
-
-               # The active connection
-               self.conn = None
-
-               # Callbacks
-               self.callbacks = {}
-
-               # Perform custom initialization
-               self.init(*args, **kwargs)
-
-       def init(self, *args, **kwargs):
-               pass
-
-       @property
-       def url(self):
-               raise NotImplementedError
-
-       async def connect(self):
-               """
-                       This will create a connection
-               """
-               conn = await self.hub._socket(self.url,
-                       on_message_callback=self.on_message_callback)
-
-               # Disable Nagle's algorithm
-               if self.nodelay:
-                       conn.set_nodelay(True)
-
-               return conn
-
-       async def reconnect(self):
-               """
-                       Tries to reconnect for forever
-               """
-               attempts = 0
-
-               while True:
-                       attempts += 1
-
-                       log.debug("Trying to reconnect (attempt %s)..." % attempts)
-
-                       try:
-                               self.conn = await self.connect()
-
-                       # The web service responded with some error
-                       except tornado.httpclient.HTTPClientError as e:
-                               log.error("%s: Received HTTP Error %s" % (self.url, e.code))
-
-                               # 502 - Proxy Error
-                               # 503 - Service Unavailable
-                               if e.code in (502, 503):
-                                       await asyncio.sleep(10)
-
-                               # Raise any unhandled errors
-                               else:
-                                       raise e
-
-                       # The web service did not respond in time
-                       except tornado.simple_httpclient.HTTPTimeoutError as e:
-                               await asyncio.sleep(30)
-
-                       # Raise all other exceptions
-                       except Exception as e:
-                               raise e
-
-                       # If the connection was established successfully, we return
-                       else:
-                               return
-
-       def close(self):
-               """
-                       Closes the connection
-               """
-               if self.conn:
-                       self.conn.close()
-
-       def on_message_callback(self, message):
-               # Fail if no callbacks have been set
-               if not self.callbacks:
-                       raise NotImplementedError
-
-               # Decode the message
-               message = self.hub._decode_json_message(message)
-
-               # Ignore empty messages
-               if message is None:
-                       return
-
-               # Log the received message
-               log.debug("Received message:\n%s" % json.dumps(message, indent=4))
-
-               # Fetch the message type & data
-               type = message.get("type")
-               data = message.get("data")
-
-               # Find a suitable callback
-               try:
-                       callback = self.callbacks[type]
-
-               # Log an error for unknown messages and ignore them
-               except KeyError:
-                       log.error("Received message of unknown type '%s'" % type)
-                       return
-
-               # Call the callback
-               callback(data)
-
-       async def write_message(self, message, **kwargs):
-               """
-                       Sends a message but encodes it into JSON first
-               """
-               # This should never happen
-               if not self.conn:
-                       raise RuntimeError("Not connected")
-
-               if isinstance(message, dict):
-                       message = tornado.escape.json_encode(message)
-
-               try:
-                       return await self.conn.write_message(message, **kwargs)
-
-               except tornado.websocket.WebSocketClosedError as e:
-                       # Try to reconnect
-                       await self.reconnect()
-
-                       # Try to send the message again
-                       return await self.write_message(message, **kwargs)
-
-
-class ControlConnection(HubObject):
-       url = "/api/v1/builders/control"
-
-       def init(self, daemon):
-               self.daemon = daemon
-
-               # Callbacks
-               self.callbacks = {
-                       "job" : self.daemon.job_received,
-               }
-
-               # Fetch processor information
-               self.cpu = cpuinfo.get_cpu_info()
-
-               # Fetch the native architecture
-               self.native_arch = _pakfire.native_arch()
-
-       async def submit_stats(self):
-               """
-                       Sends stats about this builder
-               """
-               log.debug("Sending stats...")
-
-               # Fetch processor information
-               cpu_times = psutil.cpu_times_percent()
-
-               # Fetch memory/swap information
-               mem  = psutil.virtual_memory()
-               swap = psutil.swap_memory()
-
-               # Fetch load average
-               loadavg = psutil.getloadavg()
-
-               await self.write_message({
-                       "type" : "stats",
-                       "data" : {
-                               # CPU info
-                               "cpu_model"       : self.cpu.get("brand"),
-                               "cpu_count"       : self.cpu.get("count"),
-                               "cpu_arch"        : self.native_arch,
-
-                               # Pakfire + OS
-                               "pakfire_version" : PAKFIRE_VERSION,
-                               "os_name"         : util.get_distro_name(),
-
-                               # CPU Times
-                               "cpu_user"       : cpu_times.user,
-                               "cpu_nice"       : cpu_times.nice,
-                               "cpu_system"     : cpu_times.system,
-                               "cpu_idle"       : cpu_times.idle,
-                               "cpu_iowait"     : cpu_times.iowait,
-                               "cpu_irq"        : cpu_times.irq,
-                               "cpu_softirq"    : cpu_times.softirq,
-                               "cpu_steal"      : cpu_times.steal,
-                               "cpu_guest"      : cpu_times.guest,
-                               "cpu_guest_nice" : cpu_times.guest_nice,
-
-                               # Load average
-                               "loadavg1"       : loadavg[0],
-                               "loadavg5"       : loadavg[1],
-                               "loadavg15"      : loadavg[2],
-
-                               # Memory
-                               "mem_total"      : mem.total,
-                               "mem_available"  : mem.available,
-                               "mem_used"       : mem.used,
-                               "mem_free"       : mem.free,
-                               "mem_active"     : mem.active,
-                               "mem_inactive"   : mem.inactive,
-                               "mem_buffers"    : mem.buffers,
-                               "mem_cached"     : mem.cached,
-                               "mem_shared"     : mem.shared,
-
-                               # Swap
-                               "swap_total"     : swap.total,
-                               "swap_used"      : swap.used,
-                               "swap_free"      : swap.free,
-                       },
-               })
-
-
-class JobControlConnection(HubObject):
-       """
-               Proxy for Build Jobs
-       """
-       def init(self, id, worker):
-               self.id = id
-
-               # Callbacks
-               self.callbacks = {
-                       "abort" : worker.abort,
-               }
-
-       @property
-       def url(self):
-               return "/api/v1/jobs/%s" % self.id
-
-       async def finished(self, success, packages=None, logfile=None):
-               """
-                       Will tell the hub that a job has finished
-               """
-               # Upload the log file
-               if logfile:
-                       logfile = await self.hub.upload(logfile, filename="%s.log" % self.id)
-
-               # Upload the packages
-               if packages:
-                       packages = await self.hub.upload_multi(*packages)
-
-               while True:
-                       try:
-                               # Send the request
-                               response = await self.hub._request("POST", "/api/v1/jobs/%s/finished" % self.id,
-                                       success="1" if success else "0", logfile=logfile, packages=packages,
-                               )
-
-                       # Try again after a short moment on connection errors
-                       except TemporaryConnectionError as e:
-                               await asyncio.sleep(5)
-
-                       else:
-                               break
-
-               # Handle the response
-               # XXX TODO
-
-       async def log(self, timestamp, level, message):
-               """
-                       Sends a log message to the hub
-               """
-               await self.write_message({
-                       "type" : "log",
-                       "data" : {
-                               "timestamp" : timestamp,
-                               "level"     : level,
-                               "message"   : message,
-                       },
-               })