# #
###############################################################################
-import _pakfire
+import asyncio
+import cpuinfo
+import json
+import kerberos
+import logging
+import os
+import psutil
+import subprocess
+import tempfile
+import tornado.httpclient
+import tornado.simple_httpclient
+import tornado.websocket
+import urllib.parse
+
+from .__version__ import PAKFIRE_VERSION
+from . import _pakfire
+from . import util
+
+# Configure some useful defaults for all requests
+tornado.httpclient.AsyncHTTPClient.configure(
+ None, defaults = {
+ "user_agent" : "pakfire/%s" % PAKFIRE_VERSION,
+ },
+)
+
+# Setup logging
+log = logging.getLogger("pakfire.buildservice")
+
+class AuthError(Exception):
+ """
+ Raised when the client could not authenticate against the build service
+ """
+ pass
+
+
+class TransportError(Exception):
+ pass
+
+
+class TemporaryConnectionError(TransportError):
+ """
+ Raised when there is a temporary connection issue and
+ the request should be tried again.
+ """
+ pass
+
class BuildService(_pakfire.BuildService):
"""
This wraps the parts of the build service
that has been implemented in libpakfire.
"""
- pass
+ def __init__(self):
+ super().__init__()
+
+ # Initialise the HTTP client
+ self.client = tornado.httpclient.AsyncHTTPClient()
+
+ async def _socket(self, path, **kwargs):
+ return await self._request("GET", path,
+
+ # Enable websocket and ping once every ten seconds
+ websocket=True,
+ websocket_ping_interval=10,
+ websocket_ping_timeout=60,
+
+ **kwargs,
+ )
+
+ async def _request(self, method, path, websocket=False, websocket_ping_interval=None,
+ websocket_ping_timeout=None, authenticate=True,
+ body=None, body_producer=None, on_message_callback=None, **kwargs):
+ headers = {}
+ query_args = {}
+
+ # Make absolute URL
+ url = urllib.parse.urljoin(self.url, path)
+
+ # Change scheme for websocket
+ if websocket and url.startswith("https://"):
+ url = url.replace("https://", "wss://")
+
+ # Filter all query arguments
+ for arg in kwargs:
+ # Skip anything that is None
+ if kwargs[arg] is None:
+ continue
+
+ # Add to query arguments
+ query_args[arg] = kwargs[arg]
+
+ # Encode query arguments
+ query_args = urllib.parse.urlencode(query_args, doseq=True)
+
+ # Add query arguments
+ if method in ("GET", "PUT", "DELETE"):
+ url = "%s?%s" % (url, query_args)
+
+ # Add any arguments to the body
+ elif method == "POST":
+ if body is None:
+ body = query_args
+
+ # Perform Kerberos authentication
+ if authenticate:
+ krb5_context = self._setup_krb5_context(url)
+
+ # Fetch the Kerberos client response
+ krb5_client_response = kerberos.authGSSClientResponse(krb5_context)
+
+ # Set the Negotiate header
+ headers |= {
+ "Authorization" : "Negotiate %s" % krb5_client_response,
+ }
+
+ # Make the request
+ req = tornado.httpclient.HTTPRequest(
+ method=method, url=url, headers=headers, body=body,
+
+ # Give the server more time to respond
+ request_timeout=60,
+
+ # Add all the rest
+ body_producer=body_producer,
+ )
+
+ # Is this a web socket request?
+ if websocket:
+ return await tornado.websocket.websocket_connect(
+ req,
+ ping_interval=websocket_ping_interval,
+ ping_timeout=websocket_ping_timeout,
+ on_message_callback=on_message_callback,
+ )
+
+ # Send the request and wait for a response
+ try:
+ res = await self.client.fetch(req)
+
+ # Catch any HTTP errors
+ except tornado.httpclient.HTTPError as e:
+ if e.code in (502, 503):
+ raise TemporaryConnectionError from e
+
+ # Re-raise anything else
+ raise e
+
+ # Perform mutual authentication
+ if authenticate:
+ for header in res.headers.get_list("WWW-Authenticate"):
+ # Skip anything that isn't a Negotiate header
+ if not header.startswith("Negotiate "):
+ continue
+
+ # Fetch the server response
+ krb5_server_response = header.removeprefix("Negotiate ")
+
+ # Validate the server response
+ result = kerberos.authGSSClientStep(krb5_context, krb5_server_response)
+ if not result == kerberos.AUTH_GSS_COMPLETE:
+ raise AuthError("Could not verify the Kerberos server response")
+
+ log.debug("Kerberos Server Response validating succeeded")
+
+ # Call this so that we won't end in the else block
+ break
+
+ # If there were no headers
+ else:
+ raise AuthError("Mutual authentication failed")
+
+ # Decode JSON response
+ if res.body:
+ return json.loads(res.body)
+
+ # Empty response
+ return {}
+
+ async def _proxy(self, cls, *args, **kwargs):
+ conn = cls(self, *args, **kwargs)
+
+ # Create the initial connection
+ await conn.reconnect()
+
+ return conn
+
+ def _setup_krb5_context(self, url):
+ """
+ Creates the Kerberos context that can be used to perform client
+ authentication against the server, and mutual authentication for the server.
+ """
+ # Parse the input URL
+ url = urllib.parse.urlparse(url)
+
+ # Create a new client context
+ result, krb5_context = kerberos.authGSSClientInit("HTTP@%s" % url.hostname)
+
+ if not result == kerberos.AUTH_GSS_COMPLETE:
+ raise AuthError("Could not create Kerberos Client context")
+
+ # Next step...
+ try:
+ result = kerberos.authGSSClientStep(krb5_context, "")
+
+ except kerberos.GSSError as e:
+ log.error("Kerberos authentication failed: %s" % e)
+
+ raise AuthError("%s" % e) from e
+
+ if not result == kerberos.AUTH_GSS_CONTINUE:
+ raise AuthError("Cloud not continue Kerberos authentication")
+
+ return krb5_context
+
+ # Builder
+
+ async def control(self, *args, **kwargs):
+ """
+ Creates a control connection
+ """
+ return await self._proxy(ControlConnection, *args, **kwargs)
+
+ async def job(self, *args, **kwargs):
+ """
+ Creates a control connection for a certain job
+ """
+ return await self._proxy(JobControlConnection, *args, **kwargs)
+
+
+class Connection(object):
+ def __init__(self, service, *args, **kwargs):
+ self.service = service
+
+ # The active connection
+ self.conn = None
+
+ # Callbacks
+ self.callbacks = {}
+
+ # Perform custom initialization
+ self.init(*args, **kwargs)
+
+ def init(self, *args, **kwargs):
+ pass
+
+ @property
+ def url(self):
+ raise NotImplementedError
+
+ async def connect(self):
+ """
+ This will create a connection
+ """
+ return await self.service._socket(self.url,
+ on_message_callback=self.on_message_callback)
+
+ async def reconnect(self):
+ """
+ Tries to reconnect for forever
+ """
+ attempts = 0
+
+ while True:
+ attempts += 1
+
+ log.debug("Trying to reconnect (attempt %s)..." % attempts)
+
+ try:
+ self.conn = await self.connect()
+
+ # The web service responded with some error
+ except tornado.httpclient.HTTPClientError as e:
+ log.error("%s: Received HTTP Error %s" % (self.url, e.code))
+
+ # 502 - Proxy Error
+ # 503 - Service Unavailable
+ if e.code in (502, 503):
+ await asyncio.sleep(10)
+
+ # Raise any unhandled errors
+ else:
+ raise e
+
+ # The web service did not respond in time
+ except tornado.simple_httpclient.HTTPTimeoutError as e:
+ await asyncio.sleep(30)
+
+ # Raise all other exceptions
+ except Exception as e:
+ raise e
+
+ # If the connection was established successfully, we return
+ else:
+ return
+
+ def close(self):
+ """
+ Closes the connection
+ """
+ if self.conn:
+ self.conn.close()
+
+ def on_message_callback(self, message):
+ # Fail if no callbacks have been set
+ if not self.callbacks:
+ raise NotImplementedError
+
+ # Decode the message
+ message = self._decode_json_message(message)
+
+ # Ignore empty messages
+ if message is None:
+ return
+
+ # Log the received message
+ log.debug("Received message:\n%s" % json.dumps(message, indent=4))
+
+ # Fetch the message type & data
+ type = message.get("type")
+ data = message.get("data")
+
+ # Find a suitable callback
+ try:
+ callback = self.callbacks[type]
+
+ # Log an error for unknown messages and ignore them
+ except KeyError:
+ log.error("Received message of unknown type '%s'" % type)
+ return
+
+ # Call the callback
+ callback(data)
+
+ @staticmethod
+ def _decode_json_message(message):
+ """
+ Takes a received message and decodes it
+ """
+ # Ignore empty messages
+ if message is None:
+ return
+
+ try:
+ message = json.loads(message)
+ except json.JSONDecodeError:
+ log.error("Could not decode JSON message:\n%s" % message)
+ return
+
+ return message
+
+ async def write_message(self, message, **kwargs):
+ """
+ Sends a message but encodes it into JSON first
+ """
+ # This should never happen
+ if not self.conn:
+ raise RuntimeError("Not connected")
+
+ if isinstance(message, dict):
+ message = tornado.escape.json_encode(message)
+
+ try:
+ return await self.conn.write_message(message, **kwargs)
+
+ except tornado.websocket.WebSocketClosedError as e:
+ # Try to reconnect
+ await self.reconnect()
+
+ # Try to send the message again
+ return await self.write_message(message, **kwargs)
+
+
+class ControlConnection(Connection):
+ url = "/api/v1/builders/control"
+
+ def init(self, daemon):
+ self.daemon = daemon
+
+ # Callbacks
+ self.callbacks = {
+ "job" : self.daemon.job_received,
+ }
+
+ # Fetch processor information
+ self.cpu = cpuinfo.get_cpu_info()
+
+ # Fetch the native architecture
+ self.native_arch = _pakfire.native_arch()
+
+ async def submit_stats(self):
+ """
+ Sends stats about this builder
+ """
+ log.debug("Sending stats...")
+
+ # Fetch processor information
+ cpu_times = psutil.cpu_times_percent()
+
+ # Fetch memory/swap information
+ mem = psutil.virtual_memory()
+ swap = psutil.swap_memory()
+
+ # Fetch load average
+ loadavg = psutil.getloadavg()
+
+ await self.write_message({
+ "type" : "stats",
+ "data" : {
+ # CPU info
+ "cpu_model" : self.cpu.get("brand"),
+ "cpu_count" : self.cpu.get("count"),
+ "cpu_arch" : self.native_arch,
+
+ # Pakfire + OS
+ "pakfire_version" : PAKFIRE_VERSION,
+ "os_name" : util.get_distro_name(),
+
+ # CPU Times
+ "cpu_user" : cpu_times.user,
+ "cpu_nice" : cpu_times.nice,
+ "cpu_system" : cpu_times.system,
+ "cpu_idle" : cpu_times.idle,
+ "cpu_iowait" : cpu_times.iowait,
+ "cpu_irq" : cpu_times.irq,
+ "cpu_softirq" : cpu_times.softirq,
+ "cpu_steal" : cpu_times.steal,
+ "cpu_guest" : cpu_times.guest,
+ "cpu_guest_nice" : cpu_times.guest_nice,
+
+ # Load average
+ "loadavg1" : loadavg[0],
+ "loadavg5" : loadavg[1],
+ "loadavg15" : loadavg[2],
+
+ # Memory
+ "mem_total" : mem.total,
+ "mem_available" : mem.available,
+ "mem_used" : mem.used,
+ "mem_free" : mem.free,
+ "mem_active" : mem.active,
+ "mem_inactive" : mem.inactive,
+ "mem_buffers" : mem.buffers,
+ "mem_cached" : mem.cached,
+ "mem_shared" : mem.shared,
+
+ # Swap
+ "swap_total" : swap.total,
+ "swap_used" : swap.used,
+ "swap_free" : swap.free,
+ },
+ })
+
+
+class JobControlConnection(Connection):
+ """
+ Proxy for Build Jobs
+ """
+ def init(self, id, worker):
+ self.id = id
+
+ # Callbacks
+ self.callbacks = {
+ "abort" : worker.abort,
+ }
+
+ @property
+ def url(self):
+ return "/api/v1/jobs/%s" % self.id
+
+ async def finished(self, success, packages=None, logfile=None):
+ """
+ Will tell the hub that a job has finished
+ """
+ # Upload the log file
+ if logfile:
+ logfile = await self.service.upload(logfile, filename="%s.log" % self.id)
+
+ # Upload the packages
+ if packages:
+ for package in packages:
+ await self.service.upload(package)
+
+ while True:
+ try:
+ # Send the request
+ response = await self.service._request("POST", "/api/v1/jobs/%s/finished" % self.id,
+ success="1" if success else "0", logfile=logfile, packages=packages,
+ )
+
+ # Try again after a short moment on connection errors
+ except TemporaryConnectionError as e:
+ await asyncio.sleep(5)
+
+ else:
+ break
+
+ # Handle the response
+ # XXX TODO
+
+ async def log(self, timestamp, level, message):
+ """
+ Sends a log message to the hub
+ """
+ await self.write_message({
+ "type" : "log",
+ "data" : {
+ "timestamp" : timestamp,
+ "level" : level,
+ "message" : message,
+ },
+ })
+++ /dev/null
-#!/usr/bin/python3
-###############################################################################
-# #
-# Pakfire - The IPFire package management system #
-# Copyright (C) 2013 Pakfire development team #
-# #
-# This program is free software: you can redistribute it and/or modify #
-# it under the terms of the GNU General Public License as published by #
-# the Free Software Foundation, either version 3 of the License, or #
-# (at your option) any later version. #
-# #
-# This program is distributed in the hope that it will be useful, #
-# but WITHOUT ANY WARRANTY; without even the implied warranty of #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
-# GNU General Public License for more details. #
-# #
-# You should have received a copy of the GNU General Public License #
-# along with this program. If not, see <http://www.gnu.org/licenses/>. #
-# #
-###############################################################################
-
-import asyncio
-import cpuinfo
-import functools
-import hashlib
-import json
-import kerberos
-import logging
-import os.path
-import progressbar2 as progressbar
-import psutil
-import subprocess
-import tempfile
-import tornado.escape
-import tornado.httpclient
-import tornado.simple_httpclient
-import tornado.websocket
-import urllib.parse
-
-from . import _pakfire
-from . import util
-from .constants import *
-from .i18n import _
-
-# Setup logging
-log = logging.getLogger("pakfire.hub")
-
-# Configure some useful defaults for all requests
-tornado.httpclient.AsyncHTTPClient.configure(
- None, defaults = {
- "user_agent" : "pakfire/%s" % PAKFIRE_VERSION,
- },
-)
-
-class AuthError(Exception):
- """
- Raised when the client could not authenticate against the hub
- """
- pass
-
-
-class TransportError(Exception):
- pass
-
-
-class TemporaryConnectionError(TransportError):
- """
- Raised when there is a temporary connection issue and
- the request should be tried again.
- """
- pass
-
-
-class Hub(object):
- def __init__(self, url, keytab=None):
- self.url = url
-
- # Store path to keytab
- self.keytab = keytab
-
- # Initialise the HTTP client
- self.client = tornado.httpclient.AsyncHTTPClient()
-
- # XXX support proxies
-
- # Fetch a TGT with the given keytab
- if self.keytab:
- self._setup_credentials_cache()
-
- self._fetch_kerberos_ticket()
-
- def _setup_credentials_cache(self):
- """
- Create a temporary file to be used as Kerberos credentials cache
- """
- self.credentials_cache = tempfile.NamedTemporaryFile()
-
- os.environ["KRB5CCNAME"] = self.credentials_cache.name
-
- def _fetch_kerberos_ticket(self):
- command = ["kinit", "-k", "-t", self.keytab]
-
- p = subprocess.run(command, check=True, capture_output=True, text=True)
-
- async def _socket(self, path, **kwargs):
- return await self._request("GET", path,
-
- # Enable websocket and ping once every ten seconds
- websocket=True,
- websocket_ping_interval=10,
- websocket_ping_timeout=60,
-
- **kwargs,
- )
-
- async def _request(self, method, path, websocket=False, websocket_ping_interval=None,
- websocket_ping_timeout=None, authenticate=True,
- body=None, body_producer=None, on_message_callback=None, **kwargs):
- headers = {}
- query_args = {}
-
- # Make absolute URL
- url = urllib.parse.urljoin(self.url, path)
-
- # Change scheme for websocket
- if websocket and url.startswith("https://"):
- url = url.replace("https://", "wss://")
-
- # Filter all query arguments
- for arg in kwargs:
- # Skip anything that is None
- if kwargs[arg] is None:
- continue
-
- # Add to query arguments
- query_args[arg] = kwargs[arg]
-
- # Encode query arguments
- query_args = urllib.parse.urlencode(query_args, doseq=True)
-
- # Add query arguments
- if method in ("GET", "PUT", "DELETE"):
- url = "%s?%s" % (url, query_args)
-
- # Add any arguments to the body
- elif method == "POST":
- if body is None:
- body = query_args
-
- # Perform Kerberos authentication
- if authenticate:
- krb5_context = self._setup_krb5_context(url)
-
- # Fetch the Kerberos client response
- krb5_client_response = kerberos.authGSSClientResponse(krb5_context)
-
- # Set the Negotiate header
- headers |= {
- "Authorization" : "Negotiate %s" % krb5_client_response,
- }
-
- # Make the request
- req = tornado.httpclient.HTTPRequest(
- method=method, url=url, headers=headers, body=body,
-
- # Give the server more time to respond
- request_timeout=60,
-
- # Add all the rest
- body_producer=body_producer,
- )
-
- # Is this a web socket request?
- if websocket:
- return await tornado.websocket.websocket_connect(
- req,
- ping_interval=websocket_ping_interval,
- ping_timeout=websocket_ping_timeout,
- on_message_callback=on_message_callback,
- )
-
- # Send the request and wait for a response
- try:
- res = await self.client.fetch(req)
-
- # Catch any HTTP errors
- except tornado.httpclient.HTTPError as e:
- if e.code in (502, 503):
- raise TemporaryConnectionError from e
-
- # Re-raise anything else
- raise e
-
- # Perform mutual authentication
- if authenticate:
- for header in res.headers.get_list("WWW-Authenticate"):
- # Skip anything that isn't a Negotiate header
- if not header.startswith("Negotiate "):
- continue
-
- # Fetch the server response
- krb5_server_response = header.removeprefix("Negotiate ")
-
- # Validate the server response
- result = kerberos.authGSSClientStep(krb5_context, krb5_server_response)
- if not result == kerberos.AUTH_GSS_COMPLETE:
- raise AuthError("Could not verify the Kerberos server response")
-
- log.debug("Kerberos Server Response validating succeeded")
-
- # Call this so that we won't end in the else block
- break
-
- # If there were no headers
- else:
- raise AuthError("Mutual authentication failed")
-
- # Decode JSON response
- if res.body:
- return json.loads(res.body)
-
- # Empty response
- return {}
-
- async def _proxy(self, cls, *args, **kwargs):
- conn = cls(self, *args, **kwargs)
-
- # Create the initial connection
- await conn.reconnect()
-
- return conn
-
- def _setup_krb5_context(self, url):
- """
- Creates the Kerberos context that can be used to perform client
- authentication against the server, and mutual authentication for the server.
- """
- # Parse the input URL
- url = urllib.parse.urlparse(url)
-
- # Create a new client context
- result, krb5_context = kerberos.authGSSClientInit("HTTP@%s" % url.hostname)
-
- if not result == kerberos.AUTH_GSS_COMPLETE:
- raise AuthError("Could not create Kerberos Client context")
-
- # Next step...
- try:
- result = kerberos.authGSSClientStep(krb5_context, "")
-
- except kerberos.GSSError as e:
- log.error("Kerberos authentication failed: %s" % e)
-
- raise AuthError("%s" % e) from e
-
- if not result == kerberos.AUTH_GSS_CONTINUE:
- raise AuthError("Cloud not continue Kerberos authentication")
-
- return krb5_context
-
- # Uploads
-
- async def upload(self, path, filename=None, show_progress=True):
- """
- Uploads the file to the hub returning the upload ID
- """
- log.debug("Uploading %s..." % path)
-
- # Use the basename of the file if no name was given
- if filename is None:
- filename = os.path.basename(path)
-
- # Determine the filesize
- size = os.path.getsize(path)
-
- # Make progressbar
- if show_progress:
- p = progressbar.ProgressBar(
- max_value=size,
- widgets=[
- progressbar.FormatCustomText(_("Uploading %s") % filename),
- progressbar.Percentage(),
- progressbar.Bar(),
- progressbar.FileTransferSpeed(),
- progressbar.DataSize(),
- progressbar.AdaptiveETA(),
- ],
- )
- else:
- p = None
-
- # Compute a digest
- digest = self._compute_digest("blake2b", path)
-
- while True:
- # Prepare the file for streaming
- body_producer = functools.partial(self._stream_file, path, size, p)
-
- # Perform upload
- try:
- response = await self._request("PUT", "/api/v1/uploads",
- body_producer=body_producer,
- filename=filename, size=size, digest=digest
- )
-
- # On temporary issues, try again after a few seconds
- except TemporaryConnectionError as e:
- await asyncio.sleep(5)
-
- else:
- break
-
- # Return the upload ID
- return response.get("id")
-
- async def delete_upload(self, upload_id):
- await self._request("DELETE", "/api/v1/uploads/%s" % upload_id)
-
- async def upload_multi(self, *paths, show_progress=True):
- """
- Upload multiple files
-
- If one file could not be uploaded, any other uploads will be deleted
- """
- uploads = []
-
- # Upload everything
- try:
- for path in paths:
- upload = await self.upload(path, show_progress=show_progress)
-
- # Store the upload ID
- uploads.append(upload)
-
- except Exception as e:
- # Remove any previous uploads
- await asyncio.gather(
- *(self.delete_upload(upload) for upload in uploads),
- )
-
- # Raise the exception
- raise e
-
- # Return the IDs of the uploads
- return uploads
-
- @staticmethod
- def _stream_file(path, size, p, write):
- try:
- with open(path, "rb") as f:
- while True:
- buf = f.read(64 * 1024)
- if not buf:
- break
-
- # Update progressbar
- if p:
- l = len(buf)
- p.increment(l)
-
- write(buf)
- finally:
- # Finish the progressbar
- if p:
- p.finish()
-
- @staticmethod
- def _compute_digest(algo, path):
- h = hashlib.new(algo)
-
- with open(path, "rb") as f:
- while True:
- buf = f.read(64 * 1024)
- if not buf:
- break
-
- h.update(buf)
-
- return "%s:%s" % (algo, h.hexdigest())
-
- @staticmethod
- def _decode_json_message(message):
- """
- Takes a received message and decodes it.
-
- It will then call the callback with the decoded message.
- """
- # Ignore empty messages
- if message is None:
- return
-
- try:
- message = json.loads(message)
- except json.JSONDecodeError:
- log.error("Could not decode JSON message:\n%s" % message)
- return
-
- return message
-
- # Builder
-
- async def control(self, *args, **kwargs):
- """
- Creates a control connection
- """
- return await self._proxy(ControlConnection, *args, **kwargs)
-
- async def job(self, *args, **kwargs):
- """
- Creates a control connection for a certain job
- """
- return await self._proxy(JobControlConnection, *args, **kwargs)
-
-
-class HubObject(object):
- # Disable Nagle's algorithm?
- nodelay = False
-
- def __init__(self, hub, *args, **kwargs):
- self.hub = hub
-
- # The active connection
- self.conn = None
-
- # Callbacks
- self.callbacks = {}
-
- # Perform custom initialization
- self.init(*args, **kwargs)
-
- def init(self, *args, **kwargs):
- pass
-
- @property
- def url(self):
- raise NotImplementedError
-
- async def connect(self):
- """
- This will create a connection
- """
- conn = await self.hub._socket(self.url,
- on_message_callback=self.on_message_callback)
-
- # Disable Nagle's algorithm
- if self.nodelay:
- conn.set_nodelay(True)
-
- return conn
-
- async def reconnect(self):
- """
- Tries to reconnect for forever
- """
- attempts = 0
-
- while True:
- attempts += 1
-
- log.debug("Trying to reconnect (attempt %s)..." % attempts)
-
- try:
- self.conn = await self.connect()
-
- # The web service responded with some error
- except tornado.httpclient.HTTPClientError as e:
- log.error("%s: Received HTTP Error %s" % (self.url, e.code))
-
- # 502 - Proxy Error
- # 503 - Service Unavailable
- if e.code in (502, 503):
- await asyncio.sleep(10)
-
- # Raise any unhandled errors
- else:
- raise e
-
- # The web service did not respond in time
- except tornado.simple_httpclient.HTTPTimeoutError as e:
- await asyncio.sleep(30)
-
- # Raise all other exceptions
- except Exception as e:
- raise e
-
- # If the connection was established successfully, we return
- else:
- return
-
- def close(self):
- """
- Closes the connection
- """
- if self.conn:
- self.conn.close()
-
- def on_message_callback(self, message):
- # Fail if no callbacks have been set
- if not self.callbacks:
- raise NotImplementedError
-
- # Decode the message
- message = self.hub._decode_json_message(message)
-
- # Ignore empty messages
- if message is None:
- return
-
- # Log the received message
- log.debug("Received message:\n%s" % json.dumps(message, indent=4))
-
- # Fetch the message type & data
- type = message.get("type")
- data = message.get("data")
-
- # Find a suitable callback
- try:
- callback = self.callbacks[type]
-
- # Log an error for unknown messages and ignore them
- except KeyError:
- log.error("Received message of unknown type '%s'" % type)
- return
-
- # Call the callback
- callback(data)
-
- async def write_message(self, message, **kwargs):
- """
- Sends a message but encodes it into JSON first
- """
- # This should never happen
- if not self.conn:
- raise RuntimeError("Not connected")
-
- if isinstance(message, dict):
- message = tornado.escape.json_encode(message)
-
- try:
- return await self.conn.write_message(message, **kwargs)
-
- except tornado.websocket.WebSocketClosedError as e:
- # Try to reconnect
- await self.reconnect()
-
- # Try to send the message again
- return await self.write_message(message, **kwargs)
-
-
-class ControlConnection(HubObject):
- url = "/api/v1/builders/control"
-
- def init(self, daemon):
- self.daemon = daemon
-
- # Callbacks
- self.callbacks = {
- "job" : self.daemon.job_received,
- }
-
- # Fetch processor information
- self.cpu = cpuinfo.get_cpu_info()
-
- # Fetch the native architecture
- self.native_arch = _pakfire.native_arch()
-
- async def submit_stats(self):
- """
- Sends stats about this builder
- """
- log.debug("Sending stats...")
-
- # Fetch processor information
- cpu_times = psutil.cpu_times_percent()
-
- # Fetch memory/swap information
- mem = psutil.virtual_memory()
- swap = psutil.swap_memory()
-
- # Fetch load average
- loadavg = psutil.getloadavg()
-
- await self.write_message({
- "type" : "stats",
- "data" : {
- # CPU info
- "cpu_model" : self.cpu.get("brand"),
- "cpu_count" : self.cpu.get("count"),
- "cpu_arch" : self.native_arch,
-
- # Pakfire + OS
- "pakfire_version" : PAKFIRE_VERSION,
- "os_name" : util.get_distro_name(),
-
- # CPU Times
- "cpu_user" : cpu_times.user,
- "cpu_nice" : cpu_times.nice,
- "cpu_system" : cpu_times.system,
- "cpu_idle" : cpu_times.idle,
- "cpu_iowait" : cpu_times.iowait,
- "cpu_irq" : cpu_times.irq,
- "cpu_softirq" : cpu_times.softirq,
- "cpu_steal" : cpu_times.steal,
- "cpu_guest" : cpu_times.guest,
- "cpu_guest_nice" : cpu_times.guest_nice,
-
- # Load average
- "loadavg1" : loadavg[0],
- "loadavg5" : loadavg[1],
- "loadavg15" : loadavg[2],
-
- # Memory
- "mem_total" : mem.total,
- "mem_available" : mem.available,
- "mem_used" : mem.used,
- "mem_free" : mem.free,
- "mem_active" : mem.active,
- "mem_inactive" : mem.inactive,
- "mem_buffers" : mem.buffers,
- "mem_cached" : mem.cached,
- "mem_shared" : mem.shared,
-
- # Swap
- "swap_total" : swap.total,
- "swap_used" : swap.used,
- "swap_free" : swap.free,
- },
- })
-
-
-class JobControlConnection(HubObject):
- """
- Proxy for Build Jobs
- """
- def init(self, id, worker):
- self.id = id
-
- # Callbacks
- self.callbacks = {
- "abort" : worker.abort,
- }
-
- @property
- def url(self):
- return "/api/v1/jobs/%s" % self.id
-
- async def finished(self, success, packages=None, logfile=None):
- """
- Will tell the hub that a job has finished
- """
- # Upload the log file
- if logfile:
- logfile = await self.hub.upload(logfile, filename="%s.log" % self.id)
-
- # Upload the packages
- if packages:
- packages = await self.hub.upload_multi(*packages)
-
- while True:
- try:
- # Send the request
- response = await self.hub._request("POST", "/api/v1/jobs/%s/finished" % self.id,
- success="1" if success else "0", logfile=logfile, packages=packages,
- )
-
- # Try again after a short moment on connection errors
- except TemporaryConnectionError as e:
- await asyncio.sleep(5)
-
- else:
- break
-
- # Handle the response
- # XXX TODO
-
- async def log(self, timestamp, level, message):
- """
- Sends a log message to the hub
- """
- await self.write_message({
- "type" : "log",
- "data" : {
- "timestamp" : timestamp,
- "level" : level,
- "message" : message,
- },
- })