From: Michael Tremer Date: Fri, 27 Oct 2023 17:17:24 +0000 (+0000) Subject: buildservice: Move the old stuff into the new wrapper X-Git-Tag: 0.9.30~1386 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=1ab7529b2eb1b51f7dec3ef69ed8d78be9f33d39;p=pakfire.git buildservice: Move the old stuff into the new wrapper Signed-off-by: Michael Tremer --- diff --git a/Makefile.am b/Makefile.am index 30517deb5..ef94a43a4 100644 --- a/Makefile.am +++ b/Makefile.am @@ -116,6 +116,7 @@ CLEANFILES += \ pakfire_PYTHON = \ src/pakfire/__init__.py \ src/pakfire/__version__.py \ + src/pakfire/buildservice.py \ src/pakfire/config.py \ src/pakfire/constants.py \ src/pakfire/daemon.py \ diff --git a/configure.ac b/configure.ac index b386fa2c9..f417f8af2 100644 --- a/configure.ac +++ b/configure.ac @@ -276,7 +276,6 @@ AM_PATH_PYTHON([3.6]) AX_PYTHON_MODULE([cpuinfo], [fatal]) AX_PYTHON_MODULE([kerberos], [fatal]) -AX_PYTHON_MODULE([progressbar2], [fatal]) AX_PYTHON_MODULE([psutil], [fatal]) AX_PYTHON_MODULE([setproctitle], [fatal]) AX_PYTHON_MODULE([systemd], [fatal]) diff --git a/src/pakfire/buildservice.py b/src/pakfire/buildservice.py index 961bb37e7..5df32f9c7 100644 --- a/src/pakfire/buildservice.py +++ b/src/pakfire/buildservice.py @@ -18,11 +18,514 @@ # # ############################################################################### -import _pakfire +import asyncio +import cpuinfo +import json +import kerberos +import logging +import os +import psutil +import subprocess +import tempfile +import tornado.httpclient +import tornado.simple_httpclient +import tornado.websocket +import urllib.parse + +from .__version__ import PAKFIRE_VERSION +from . import _pakfire +from . import util + +# Configure some useful defaults for all requests +tornado.httpclient.AsyncHTTPClient.configure( + None, defaults = { + "user_agent" : "pakfire/%s" % PAKFIRE_VERSION, + }, +) + +# Setup logging +log = logging.getLogger("pakfire.buildservice") + +class AuthError(Exception): + """ + Raised when the client could not authenticate against the build service + """ + pass + + +class TransportError(Exception): + pass + + +class TemporaryConnectionError(TransportError): + """ + Raised when there is a temporary connection issue and + the request should be tried again. + """ + pass + class BuildService(_pakfire.BuildService): """ This wraps the parts of the build service that has been implemented in libpakfire. """ - pass + def __init__(self): + super().__init__() + + # Initialise the HTTP client + self.client = tornado.httpclient.AsyncHTTPClient() + + async def _socket(self, path, **kwargs): + return await self._request("GET", path, + + # Enable websocket and ping once every ten seconds + websocket=True, + websocket_ping_interval=10, + websocket_ping_timeout=60, + + **kwargs, + ) + + async def _request(self, method, path, websocket=False, websocket_ping_interval=None, + websocket_ping_timeout=None, authenticate=True, + body=None, body_producer=None, on_message_callback=None, **kwargs): + headers = {} + query_args = {} + + # Make absolute URL + url = urllib.parse.urljoin(self.url, path) + + # Change scheme for websocket + if websocket and url.startswith("https://"): + url = url.replace("https://", "wss://") + + # Filter all query arguments + for arg in kwargs: + # Skip anything that is None + if kwargs[arg] is None: + continue + + # Add to query arguments + query_args[arg] = kwargs[arg] + + # Encode query arguments + query_args = urllib.parse.urlencode(query_args, doseq=True) + + # Add query arguments + if method in ("GET", "PUT", "DELETE"): + url = "%s?%s" % (url, query_args) + + # Add any arguments to the body + elif method == "POST": + if body is None: + body = query_args + + # Perform Kerberos authentication + if authenticate: + krb5_context = self._setup_krb5_context(url) + + # Fetch the Kerberos client response + krb5_client_response = kerberos.authGSSClientResponse(krb5_context) + + # Set the Negotiate header + headers |= { + "Authorization" : "Negotiate %s" % krb5_client_response, + } + + # Make the request + req = tornado.httpclient.HTTPRequest( + method=method, url=url, headers=headers, body=body, + + # Give the server more time to respond + request_timeout=60, + + # Add all the rest + body_producer=body_producer, + ) + + # Is this a web socket request? + if websocket: + return await tornado.websocket.websocket_connect( + req, + ping_interval=websocket_ping_interval, + ping_timeout=websocket_ping_timeout, + on_message_callback=on_message_callback, + ) + + # Send the request and wait for a response + try: + res = await self.client.fetch(req) + + # Catch any HTTP errors + except tornado.httpclient.HTTPError as e: + if e.code in (502, 503): + raise TemporaryConnectionError from e + + # Re-raise anything else + raise e + + # Perform mutual authentication + if authenticate: + for header in res.headers.get_list("WWW-Authenticate"): + # Skip anything that isn't a Negotiate header + if not header.startswith("Negotiate "): + continue + + # Fetch the server response + krb5_server_response = header.removeprefix("Negotiate ") + + # Validate the server response + result = kerberos.authGSSClientStep(krb5_context, krb5_server_response) + if not result == kerberos.AUTH_GSS_COMPLETE: + raise AuthError("Could not verify the Kerberos server response") + + log.debug("Kerberos Server Response validating succeeded") + + # Call this so that we won't end in the else block + break + + # If there were no headers + else: + raise AuthError("Mutual authentication failed") + + # Decode JSON response + if res.body: + return json.loads(res.body) + + # Empty response + return {} + + async def _proxy(self, cls, *args, **kwargs): + conn = cls(self, *args, **kwargs) + + # Create the initial connection + await conn.reconnect() + + return conn + + def _setup_krb5_context(self, url): + """ + Creates the Kerberos context that can be used to perform client + authentication against the server, and mutual authentication for the server. + """ + # Parse the input URL + url = urllib.parse.urlparse(url) + + # Create a new client context + result, krb5_context = kerberos.authGSSClientInit("HTTP@%s" % url.hostname) + + if not result == kerberos.AUTH_GSS_COMPLETE: + raise AuthError("Could not create Kerberos Client context") + + # Next step... + try: + result = kerberos.authGSSClientStep(krb5_context, "") + + except kerberos.GSSError as e: + log.error("Kerberos authentication failed: %s" % e) + + raise AuthError("%s" % e) from e + + if not result == kerberos.AUTH_GSS_CONTINUE: + raise AuthError("Cloud not continue Kerberos authentication") + + return krb5_context + + # Builder + + async def control(self, *args, **kwargs): + """ + Creates a control connection + """ + return await self._proxy(ControlConnection, *args, **kwargs) + + async def job(self, *args, **kwargs): + """ + Creates a control connection for a certain job + """ + return await self._proxy(JobControlConnection, *args, **kwargs) + + +class Connection(object): + def __init__(self, service, *args, **kwargs): + self.service = service + + # The active connection + self.conn = None + + # Callbacks + self.callbacks = {} + + # Perform custom initialization + self.init(*args, **kwargs) + + def init(self, *args, **kwargs): + pass + + @property + def url(self): + raise NotImplementedError + + async def connect(self): + """ + This will create a connection + """ + return await self.service._socket(self.url, + on_message_callback=self.on_message_callback) + + async def reconnect(self): + """ + Tries to reconnect for forever + """ + attempts = 0 + + while True: + attempts += 1 + + log.debug("Trying to reconnect (attempt %s)..." % attempts) + + try: + self.conn = await self.connect() + + # The web service responded with some error + except tornado.httpclient.HTTPClientError as e: + log.error("%s: Received HTTP Error %s" % (self.url, e.code)) + + # 502 - Proxy Error + # 503 - Service Unavailable + if e.code in (502, 503): + await asyncio.sleep(10) + + # Raise any unhandled errors + else: + raise e + + # The web service did not respond in time + except tornado.simple_httpclient.HTTPTimeoutError as e: + await asyncio.sleep(30) + + # Raise all other exceptions + except Exception as e: + raise e + + # If the connection was established successfully, we return + else: + return + + def close(self): + """ + Closes the connection + """ + if self.conn: + self.conn.close() + + def on_message_callback(self, message): + # Fail if no callbacks have been set + if not self.callbacks: + raise NotImplementedError + + # Decode the message + message = self._decode_json_message(message) + + # Ignore empty messages + if message is None: + return + + # Log the received message + log.debug("Received message:\n%s" % json.dumps(message, indent=4)) + + # Fetch the message type & data + type = message.get("type") + data = message.get("data") + + # Find a suitable callback + try: + callback = self.callbacks[type] + + # Log an error for unknown messages and ignore them + except KeyError: + log.error("Received message of unknown type '%s'" % type) + return + + # Call the callback + callback(data) + + @staticmethod + def _decode_json_message(message): + """ + Takes a received message and decodes it + """ + # Ignore empty messages + if message is None: + return + + try: + message = json.loads(message) + except json.JSONDecodeError: + log.error("Could not decode JSON message:\n%s" % message) + return + + return message + + async def write_message(self, message, **kwargs): + """ + Sends a message but encodes it into JSON first + """ + # This should never happen + if not self.conn: + raise RuntimeError("Not connected") + + if isinstance(message, dict): + message = tornado.escape.json_encode(message) + + try: + return await self.conn.write_message(message, **kwargs) + + except tornado.websocket.WebSocketClosedError as e: + # Try to reconnect + await self.reconnect() + + # Try to send the message again + return await self.write_message(message, **kwargs) + + +class ControlConnection(Connection): + url = "/api/v1/builders/control" + + def init(self, daemon): + self.daemon = daemon + + # Callbacks + self.callbacks = { + "job" : self.daemon.job_received, + } + + # Fetch processor information + self.cpu = cpuinfo.get_cpu_info() + + # Fetch the native architecture + self.native_arch = _pakfire.native_arch() + + async def submit_stats(self): + """ + Sends stats about this builder + """ + log.debug("Sending stats...") + + # Fetch processor information + cpu_times = psutil.cpu_times_percent() + + # Fetch memory/swap information + mem = psutil.virtual_memory() + swap = psutil.swap_memory() + + # Fetch load average + loadavg = psutil.getloadavg() + + await self.write_message({ + "type" : "stats", + "data" : { + # CPU info + "cpu_model" : self.cpu.get("brand"), + "cpu_count" : self.cpu.get("count"), + "cpu_arch" : self.native_arch, + + # Pakfire + OS + "pakfire_version" : PAKFIRE_VERSION, + "os_name" : util.get_distro_name(), + + # CPU Times + "cpu_user" : cpu_times.user, + "cpu_nice" : cpu_times.nice, + "cpu_system" : cpu_times.system, + "cpu_idle" : cpu_times.idle, + "cpu_iowait" : cpu_times.iowait, + "cpu_irq" : cpu_times.irq, + "cpu_softirq" : cpu_times.softirq, + "cpu_steal" : cpu_times.steal, + "cpu_guest" : cpu_times.guest, + "cpu_guest_nice" : cpu_times.guest_nice, + + # Load average + "loadavg1" : loadavg[0], + "loadavg5" : loadavg[1], + "loadavg15" : loadavg[2], + + # Memory + "mem_total" : mem.total, + "mem_available" : mem.available, + "mem_used" : mem.used, + "mem_free" : mem.free, + "mem_active" : mem.active, + "mem_inactive" : mem.inactive, + "mem_buffers" : mem.buffers, + "mem_cached" : mem.cached, + "mem_shared" : mem.shared, + + # Swap + "swap_total" : swap.total, + "swap_used" : swap.used, + "swap_free" : swap.free, + }, + }) + + +class JobControlConnection(Connection): + """ + Proxy for Build Jobs + """ + def init(self, id, worker): + self.id = id + + # Callbacks + self.callbacks = { + "abort" : worker.abort, + } + + @property + def url(self): + return "/api/v1/jobs/%s" % self.id + + async def finished(self, success, packages=None, logfile=None): + """ + Will tell the hub that a job has finished + """ + # Upload the log file + if logfile: + logfile = await self.service.upload(logfile, filename="%s.log" % self.id) + + # Upload the packages + if packages: + for package in packages: + await self.service.upload(package) + + while True: + try: + # Send the request + response = await self.service._request("POST", "/api/v1/jobs/%s/finished" % self.id, + success="1" if success else "0", logfile=logfile, packages=packages, + ) + + # Try again after a short moment on connection errors + except TemporaryConnectionError as e: + await asyncio.sleep(5) + + else: + break + + # Handle the response + # XXX TODO + + async def log(self, timestamp, level, message): + """ + Sends a log message to the hub + """ + await self.write_message({ + "type" : "log", + "data" : { + "timestamp" : timestamp, + "level" : level, + "message" : message, + }, + }) diff --git a/src/pakfire/daemon.py b/src/pakfire/daemon.py index d32bf89b3..f1e25db38 100644 --- a/src/pakfire/daemon.py +++ b/src/pakfire/daemon.py @@ -17,7 +17,6 @@ import tempfile from . import _pakfire from . import buildservice from . import config -from . import hub from . import logger from pakfire.constants import * @@ -32,9 +31,6 @@ class Daemon(object): self.debug = debug self.verbose = verbose - # Initialize the connection to the buildservice - self.buildservice = buildservice.BuildService() - # Setup logger self.log = logger.setup( "pakfire", @@ -43,9 +39,8 @@ class Daemon(object): debug=self.debug, ) - # Connect to the Pakfire Hub - self.hub = self.connect_to_hub() - self.control = None + # Initialize the connection to the buildservice + self.service = buildservice.BuildService() # Set when this process receives a shutdown signal self._shutdown_signalled = None @@ -63,15 +58,6 @@ class Daemon(object): """ return self.config.get("daemon", "ccache_path", "/var/cache/pakfire/ccache") - def connect_to_hub(self): - url = self.config.get("daemon", "server", PAKFIRE_HUB) - - # Host Credentials - keytab = self.config.get("daemon", "keytab", None) - - # Create connection to the hub - return hub.Hub(url, keytab=keytab) - async def run(self): """ Main loop. @@ -83,7 +69,7 @@ class Daemon(object): self._shutdown_signalled = asyncio.Event() # Create the control connection - self.control = await self.hub.control(daemon=self) + self.control = await self.service.control(daemon=self) # Run main loop while True: @@ -196,12 +182,17 @@ class Worker(multiprocessing.Process): multiprocessing.Process.__init__(self) self.daemon = daemon - self.hub = self.daemon.hub - self.log = self.daemon.log - # The job that has been received self.data = data + @property + def service(self): + return self.daemon.service + + @property + def log(self): + return self.daemon.log + def run(self): self.log.debug("Worker %s has launched" % self.pid) @@ -265,8 +256,8 @@ class Worker(multiprocessing.Process): if not pkg: raise ValueError("Did not received a package URL") - # Connect to the hub - self.job = await self.hub.job(self.job_id, worker=self) + # Connect to the service + self.job = await self.service.job(self.job_id, worker=self) # Setup build logger logger = BuildLogger(self.log, self.job) @@ -307,7 +298,7 @@ class Worker(multiprocessing.Process): if not self.is_test(): packages = glob.glob("%s/*.pfm" % target) - # Notify the hub that the job has finished + # Notify the service that the job has finished finally: await self.job.finished( success=success, @@ -430,5 +421,5 @@ class BuildLogger(object): if message is None: continue - # Send message to the hub + # Send message to the service await self.job.log(message.created, message.levelno, message.getMessage()) diff --git a/src/pakfire/hub.py b/src/pakfire/hub.py deleted file mode 100644 index 6d819673e..000000000 --- a/src/pakfire/hub.py +++ /dev/null @@ -1,687 +0,0 @@ -#!/usr/bin/python3 -############################################################################### -# # -# Pakfire - The IPFire package management system # -# Copyright (C) 2013 Pakfire development team # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see . # -# # -############################################################################### - -import asyncio -import cpuinfo -import functools -import hashlib -import json -import kerberos -import logging -import os.path -import progressbar2 as progressbar -import psutil -import subprocess -import tempfile -import tornado.escape -import tornado.httpclient -import tornado.simple_httpclient -import tornado.websocket -import urllib.parse - -from . import _pakfire -from . import util -from .constants import * -from .i18n import _ - -# Setup logging -log = logging.getLogger("pakfire.hub") - -# Configure some useful defaults for all requests -tornado.httpclient.AsyncHTTPClient.configure( - None, defaults = { - "user_agent" : "pakfire/%s" % PAKFIRE_VERSION, - }, -) - -class AuthError(Exception): - """ - Raised when the client could not authenticate against the hub - """ - pass - - -class TransportError(Exception): - pass - - -class TemporaryConnectionError(TransportError): - """ - Raised when there is a temporary connection issue and - the request should be tried again. - """ - pass - - -class Hub(object): - def __init__(self, url, keytab=None): - self.url = url - - # Store path to keytab - self.keytab = keytab - - # Initialise the HTTP client - self.client = tornado.httpclient.AsyncHTTPClient() - - # XXX support proxies - - # Fetch a TGT with the given keytab - if self.keytab: - self._setup_credentials_cache() - - self._fetch_kerberos_ticket() - - def _setup_credentials_cache(self): - """ - Create a temporary file to be used as Kerberos credentials cache - """ - self.credentials_cache = tempfile.NamedTemporaryFile() - - os.environ["KRB5CCNAME"] = self.credentials_cache.name - - def _fetch_kerberos_ticket(self): - command = ["kinit", "-k", "-t", self.keytab] - - p = subprocess.run(command, check=True, capture_output=True, text=True) - - async def _socket(self, path, **kwargs): - return await self._request("GET", path, - - # Enable websocket and ping once every ten seconds - websocket=True, - websocket_ping_interval=10, - websocket_ping_timeout=60, - - **kwargs, - ) - - async def _request(self, method, path, websocket=False, websocket_ping_interval=None, - websocket_ping_timeout=None, authenticate=True, - body=None, body_producer=None, on_message_callback=None, **kwargs): - headers = {} - query_args = {} - - # Make absolute URL - url = urllib.parse.urljoin(self.url, path) - - # Change scheme for websocket - if websocket and url.startswith("https://"): - url = url.replace("https://", "wss://") - - # Filter all query arguments - for arg in kwargs: - # Skip anything that is None - if kwargs[arg] is None: - continue - - # Add to query arguments - query_args[arg] = kwargs[arg] - - # Encode query arguments - query_args = urllib.parse.urlencode(query_args, doseq=True) - - # Add query arguments - if method in ("GET", "PUT", "DELETE"): - url = "%s?%s" % (url, query_args) - - # Add any arguments to the body - elif method == "POST": - if body is None: - body = query_args - - # Perform Kerberos authentication - if authenticate: - krb5_context = self._setup_krb5_context(url) - - # Fetch the Kerberos client response - krb5_client_response = kerberos.authGSSClientResponse(krb5_context) - - # Set the Negotiate header - headers |= { - "Authorization" : "Negotiate %s" % krb5_client_response, - } - - # Make the request - req = tornado.httpclient.HTTPRequest( - method=method, url=url, headers=headers, body=body, - - # Give the server more time to respond - request_timeout=60, - - # Add all the rest - body_producer=body_producer, - ) - - # Is this a web socket request? - if websocket: - return await tornado.websocket.websocket_connect( - req, - ping_interval=websocket_ping_interval, - ping_timeout=websocket_ping_timeout, - on_message_callback=on_message_callback, - ) - - # Send the request and wait for a response - try: - res = await self.client.fetch(req) - - # Catch any HTTP errors - except tornado.httpclient.HTTPError as e: - if e.code in (502, 503): - raise TemporaryConnectionError from e - - # Re-raise anything else - raise e - - # Perform mutual authentication - if authenticate: - for header in res.headers.get_list("WWW-Authenticate"): - # Skip anything that isn't a Negotiate header - if not header.startswith("Negotiate "): - continue - - # Fetch the server response - krb5_server_response = header.removeprefix("Negotiate ") - - # Validate the server response - result = kerberos.authGSSClientStep(krb5_context, krb5_server_response) - if not result == kerberos.AUTH_GSS_COMPLETE: - raise AuthError("Could not verify the Kerberos server response") - - log.debug("Kerberos Server Response validating succeeded") - - # Call this so that we won't end in the else block - break - - # If there were no headers - else: - raise AuthError("Mutual authentication failed") - - # Decode JSON response - if res.body: - return json.loads(res.body) - - # Empty response - return {} - - async def _proxy(self, cls, *args, **kwargs): - conn = cls(self, *args, **kwargs) - - # Create the initial connection - await conn.reconnect() - - return conn - - def _setup_krb5_context(self, url): - """ - Creates the Kerberos context that can be used to perform client - authentication against the server, and mutual authentication for the server. - """ - # Parse the input URL - url = urllib.parse.urlparse(url) - - # Create a new client context - result, krb5_context = kerberos.authGSSClientInit("HTTP@%s" % url.hostname) - - if not result == kerberos.AUTH_GSS_COMPLETE: - raise AuthError("Could not create Kerberos Client context") - - # Next step... - try: - result = kerberos.authGSSClientStep(krb5_context, "") - - except kerberos.GSSError as e: - log.error("Kerberos authentication failed: %s" % e) - - raise AuthError("%s" % e) from e - - if not result == kerberos.AUTH_GSS_CONTINUE: - raise AuthError("Cloud not continue Kerberos authentication") - - return krb5_context - - # Uploads - - async def upload(self, path, filename=None, show_progress=True): - """ - Uploads the file to the hub returning the upload ID - """ - log.debug("Uploading %s..." % path) - - # Use the basename of the file if no name was given - if filename is None: - filename = os.path.basename(path) - - # Determine the filesize - size = os.path.getsize(path) - - # Make progressbar - if show_progress: - p = progressbar.ProgressBar( - max_value=size, - widgets=[ - progressbar.FormatCustomText(_("Uploading %s") % filename), - progressbar.Percentage(), - progressbar.Bar(), - progressbar.FileTransferSpeed(), - progressbar.DataSize(), - progressbar.AdaptiveETA(), - ], - ) - else: - p = None - - # Compute a digest - digest = self._compute_digest("blake2b", path) - - while True: - # Prepare the file for streaming - body_producer = functools.partial(self._stream_file, path, size, p) - - # Perform upload - try: - response = await self._request("PUT", "/api/v1/uploads", - body_producer=body_producer, - filename=filename, size=size, digest=digest - ) - - # On temporary issues, try again after a few seconds - except TemporaryConnectionError as e: - await asyncio.sleep(5) - - else: - break - - # Return the upload ID - return response.get("id") - - async def delete_upload(self, upload_id): - await self._request("DELETE", "/api/v1/uploads/%s" % upload_id) - - async def upload_multi(self, *paths, show_progress=True): - """ - Upload multiple files - - If one file could not be uploaded, any other uploads will be deleted - """ - uploads = [] - - # Upload everything - try: - for path in paths: - upload = await self.upload(path, show_progress=show_progress) - - # Store the upload ID - uploads.append(upload) - - except Exception as e: - # Remove any previous uploads - await asyncio.gather( - *(self.delete_upload(upload) for upload in uploads), - ) - - # Raise the exception - raise e - - # Return the IDs of the uploads - return uploads - - @staticmethod - def _stream_file(path, size, p, write): - try: - with open(path, "rb") as f: - while True: - buf = f.read(64 * 1024) - if not buf: - break - - # Update progressbar - if p: - l = len(buf) - p.increment(l) - - write(buf) - finally: - # Finish the progressbar - if p: - p.finish() - - @staticmethod - def _compute_digest(algo, path): - h = hashlib.new(algo) - - with open(path, "rb") as f: - while True: - buf = f.read(64 * 1024) - if not buf: - break - - h.update(buf) - - return "%s:%s" % (algo, h.hexdigest()) - - @staticmethod - def _decode_json_message(message): - """ - Takes a received message and decodes it. - - It will then call the callback with the decoded message. - """ - # Ignore empty messages - if message is None: - return - - try: - message = json.loads(message) - except json.JSONDecodeError: - log.error("Could not decode JSON message:\n%s" % message) - return - - return message - - # Builder - - async def control(self, *args, **kwargs): - """ - Creates a control connection - """ - return await self._proxy(ControlConnection, *args, **kwargs) - - async def job(self, *args, **kwargs): - """ - Creates a control connection for a certain job - """ - return await self._proxy(JobControlConnection, *args, **kwargs) - - -class HubObject(object): - # Disable Nagle's algorithm? - nodelay = False - - def __init__(self, hub, *args, **kwargs): - self.hub = hub - - # The active connection - self.conn = None - - # Callbacks - self.callbacks = {} - - # Perform custom initialization - self.init(*args, **kwargs) - - def init(self, *args, **kwargs): - pass - - @property - def url(self): - raise NotImplementedError - - async def connect(self): - """ - This will create a connection - """ - conn = await self.hub._socket(self.url, - on_message_callback=self.on_message_callback) - - # Disable Nagle's algorithm - if self.nodelay: - conn.set_nodelay(True) - - return conn - - async def reconnect(self): - """ - Tries to reconnect for forever - """ - attempts = 0 - - while True: - attempts += 1 - - log.debug("Trying to reconnect (attempt %s)..." % attempts) - - try: - self.conn = await self.connect() - - # The web service responded with some error - except tornado.httpclient.HTTPClientError as e: - log.error("%s: Received HTTP Error %s" % (self.url, e.code)) - - # 502 - Proxy Error - # 503 - Service Unavailable - if e.code in (502, 503): - await asyncio.sleep(10) - - # Raise any unhandled errors - else: - raise e - - # The web service did not respond in time - except tornado.simple_httpclient.HTTPTimeoutError as e: - await asyncio.sleep(30) - - # Raise all other exceptions - except Exception as e: - raise e - - # If the connection was established successfully, we return - else: - return - - def close(self): - """ - Closes the connection - """ - if self.conn: - self.conn.close() - - def on_message_callback(self, message): - # Fail if no callbacks have been set - if not self.callbacks: - raise NotImplementedError - - # Decode the message - message = self.hub._decode_json_message(message) - - # Ignore empty messages - if message is None: - return - - # Log the received message - log.debug("Received message:\n%s" % json.dumps(message, indent=4)) - - # Fetch the message type & data - type = message.get("type") - data = message.get("data") - - # Find a suitable callback - try: - callback = self.callbacks[type] - - # Log an error for unknown messages and ignore them - except KeyError: - log.error("Received message of unknown type '%s'" % type) - return - - # Call the callback - callback(data) - - async def write_message(self, message, **kwargs): - """ - Sends a message but encodes it into JSON first - """ - # This should never happen - if not self.conn: - raise RuntimeError("Not connected") - - if isinstance(message, dict): - message = tornado.escape.json_encode(message) - - try: - return await self.conn.write_message(message, **kwargs) - - except tornado.websocket.WebSocketClosedError as e: - # Try to reconnect - await self.reconnect() - - # Try to send the message again - return await self.write_message(message, **kwargs) - - -class ControlConnection(HubObject): - url = "/api/v1/builders/control" - - def init(self, daemon): - self.daemon = daemon - - # Callbacks - self.callbacks = { - "job" : self.daemon.job_received, - } - - # Fetch processor information - self.cpu = cpuinfo.get_cpu_info() - - # Fetch the native architecture - self.native_arch = _pakfire.native_arch() - - async def submit_stats(self): - """ - Sends stats about this builder - """ - log.debug("Sending stats...") - - # Fetch processor information - cpu_times = psutil.cpu_times_percent() - - # Fetch memory/swap information - mem = psutil.virtual_memory() - swap = psutil.swap_memory() - - # Fetch load average - loadavg = psutil.getloadavg() - - await self.write_message({ - "type" : "stats", - "data" : { - # CPU info - "cpu_model" : self.cpu.get("brand"), - "cpu_count" : self.cpu.get("count"), - "cpu_arch" : self.native_arch, - - # Pakfire + OS - "pakfire_version" : PAKFIRE_VERSION, - "os_name" : util.get_distro_name(), - - # CPU Times - "cpu_user" : cpu_times.user, - "cpu_nice" : cpu_times.nice, - "cpu_system" : cpu_times.system, - "cpu_idle" : cpu_times.idle, - "cpu_iowait" : cpu_times.iowait, - "cpu_irq" : cpu_times.irq, - "cpu_softirq" : cpu_times.softirq, - "cpu_steal" : cpu_times.steal, - "cpu_guest" : cpu_times.guest, - "cpu_guest_nice" : cpu_times.guest_nice, - - # Load average - "loadavg1" : loadavg[0], - "loadavg5" : loadavg[1], - "loadavg15" : loadavg[2], - - # Memory - "mem_total" : mem.total, - "mem_available" : mem.available, - "mem_used" : mem.used, - "mem_free" : mem.free, - "mem_active" : mem.active, - "mem_inactive" : mem.inactive, - "mem_buffers" : mem.buffers, - "mem_cached" : mem.cached, - "mem_shared" : mem.shared, - - # Swap - "swap_total" : swap.total, - "swap_used" : swap.used, - "swap_free" : swap.free, - }, - }) - - -class JobControlConnection(HubObject): - """ - Proxy for Build Jobs - """ - def init(self, id, worker): - self.id = id - - # Callbacks - self.callbacks = { - "abort" : worker.abort, - } - - @property - def url(self): - return "/api/v1/jobs/%s" % self.id - - async def finished(self, success, packages=None, logfile=None): - """ - Will tell the hub that a job has finished - """ - # Upload the log file - if logfile: - logfile = await self.hub.upload(logfile, filename="%s.log" % self.id) - - # Upload the packages - if packages: - packages = await self.hub.upload_multi(*packages) - - while True: - try: - # Send the request - response = await self.hub._request("POST", "/api/v1/jobs/%s/finished" % self.id, - success="1" if success else "0", logfile=logfile, packages=packages, - ) - - # Try again after a short moment on connection errors - except TemporaryConnectionError as e: - await asyncio.sleep(5) - - else: - break - - # Handle the response - # XXX TODO - - async def log(self, timestamp, level, message): - """ - Sends a log message to the hub - """ - await self.write_message({ - "type" : "log", - "data" : { - "timestamp" : timestamp, - "level" : level, - "message" : message, - }, - })