From 1479df350c89729c7d2c723a05294c41cdb80fd9 Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Tue, 25 Apr 2023 14:53:32 +0000 Subject: [PATCH] hub: Refactor communication with the hub after merging pakfire-web and -hub Signed-off-by: Michael Tremer --- src/pakfire/daemon.py | 17 +- src/pakfire/hub.py | 354 ++++++++++++++++++++++++++++-------------- 2 files changed, 246 insertions(+), 125 deletions(-) diff --git a/src/pakfire/daemon.py b/src/pakfire/daemon.py index f139e3707..c2c267d6a 100644 --- a/src/pakfire/daemon.py +++ b/src/pakfire/daemon.py @@ -47,6 +47,9 @@ class Daemon(object): # List of worker processes. self.workers = [] + # Stats Connection + self.stats = None + def connect_to_hub(self): url = self.config.get("daemon", "server", PAKFIRE_HUB) @@ -67,7 +70,7 @@ class Daemon(object): self._shutdown_signalled = asyncio.Event() # Send builder information - await self.hub.send_builder_info() + self.stats = await self.hub.builder_stats() # Join the job queue self.queue = await self.hub.queue(self.job_received) @@ -82,7 +85,8 @@ class Daemon(object): pass # Send some information about this builder - await self.hub.send_builder_stats() + if self.stats: + await self.stats.submit() # Main loop has ended, but we wait until all workers have finished. self.terminate_all_workers() @@ -242,7 +246,7 @@ class Worker(multiprocessing.Process): # Create a temporary directory in which the built packages will be copied with tempfile.TemporaryDirectory(prefix="pakfire-packages-") as target: - packages = None + packages = [] # Run the build try: @@ -269,10 +273,9 @@ class Worker(multiprocessing.Process): # Notify the hub that the job has finished finally: - await self.hub.finish_job( - job_id, + await self.job.finished( success=success, - log=logger.logfile.name, + logfile=logger.logfile.name, packages=packages, ) @@ -379,8 +382,6 @@ class BuildLogger(object): return self.logger.log(level, message) async def stream(self, timeout=0): - self.log.debug("Log streamer started") - while True: # Fetch a message from the queue try: diff --git a/src/pakfire/hub.py b/src/pakfire/hub.py index b334e096a..8545eb2a6 100644 --- a/src/pakfire/hub.py +++ b/src/pakfire/hub.py @@ -19,6 +19,7 @@ # # ############################################################################### +import asyncio import cpuinfo import functools import hashlib @@ -27,8 +28,11 @@ import kerberos import logging import os.path import psutil +import subprocess +import tempfile import tornado.escape import tornado.httpclient +import tornado.simple_httpclient import tornado.websocket import urllib.parse @@ -68,7 +72,19 @@ class Hub(object): # XXX support proxies - async def _request(self, method, path, websocket=False, authenticate=True, + async def _socket(self, path, **kwargs): + return await self._request("GET", path, + + # Enable websocket and ping once every ten seconds + websocket=True, + websocket_ping_interval=10, + websocket_ping_timeout=60, + + **kwargs, + ) + + async def _request(self, method, path, websocket=False, websocket_ping_interval=None, + websocket_ping_timeout=None, authenticate=True, body=None, body_producer=None, on_message_callback=None, **kwargs): headers = {} query_args = {} @@ -125,6 +141,8 @@ class Hub(object): if websocket: return await tornado.websocket.websocket_connect( req, + ping_interval=websocket_ping_interval, + ping_timeout=websocket_ping_timeout, on_message_callback=on_message_callback, ) @@ -164,6 +182,14 @@ class Hub(object): # Empty response return {} + async def _proxy(self, cls, *args, **kwargs): + conn = cls(self, *args, **kwargs) + + # Create the initial connection + await conn.reconnect() + + return conn + def _setup_krb5_context(self, url): """ Creates the Kerberos context that can be used to perform client @@ -203,7 +229,7 @@ class Hub(object): # Build actions def get_build(self, uuid): - return self._request("/builds/%s" % uuid, decode="json") + return self._request("/api/v1/builds/%s" % uuid, decode="json") async def build(self, path, repo=None, arches=None): """ @@ -215,7 +241,7 @@ class Hub(object): log.debug("%s has been uploaded as %s" % (path, upload_id)) # Create a new build - build_id = await self._request("POST", "/builds", + build_id = await self._request("POST", "/api/v1/builds", upload_id=upload_id, repo=repo, arches=arches) log.debug("Build creates as %s" % build_id) @@ -238,7 +264,7 @@ class Hub(object): """ Returns a list of all uploads """ - response = await self._request("GET", "/uploads") + response = await self._request("GET", "/api/v1/uploads") return response.get("uploads") @@ -275,7 +301,7 @@ class Hub(object): body_producer = functools.partial(self._stream_file, path, size, p) # Perform upload - response = await self._request("PUT", "/uploads", + response = await self._request("PUT", "/api/v1/uploads", body_producer=body_producer, filename=filename, size=size, digest=digest ) @@ -284,7 +310,7 @@ class Hub(object): return response.get("id") async def delete_upload(self, upload_id): - await self._request("DELETE", "/uploads", id=upload_id) + await self._request("DELETE", "/api/v1/uploads/%s" % upload_id) async def upload_multi(self, *paths, show_progress=True): """ @@ -354,37 +380,172 @@ class Hub(object): # Builder - async def send_builder_info(self): + async def builder_stats(self): """ Sends information about this host to the hub. This information is something that doesn't change during the lifetime of the daemon. """ - log.info(_("Sending builder information to hub...")) + return await self._proxy(BuilderStats) - # Fetch processor information - cpu = cpuinfo.get_cpu_info() + async def queue(self, job_received_callback): + """ + Connects to the hub and asks for a build job + """ + on_message_callback = functools.partial( + self._decode_json_message, job_received_callback, + ) - data = { - # CPU info - "cpu_model" : cpu.get("brand_raw"), - "cpu_count" : cpu.get("count"), - "cpu_arch" : _pakfire.native_arch(), + return await self._proxy(QueueProxy, on_message_callback=on_message_callback) - # Pakfire + OS - "pakfire_version" : PAKFIRE_VERSION, - "os_name" : util.get_distro_name(), - } + @staticmethod + def _decode_json_message(callback, message): + """ + Takes a received message and decodes it. + + It will then call the callback with the decoded message. + """ + # Ignore empty messages + if message is None: + return + + try: + message = json.loads(message) + except json.JSONDecodeError: + log.error("Could not decode JSON message:\n%s" % message) + return + + return callback(message) + + async def job(self, job_id): + """ + Connects to the given job + """ + # Return a Job proxy + return await self._proxy(Job, job_id) + + +class HubObject(object): + # Disable Nagle's algorithm? + nodelay = False + + def __init__(self, hub, *args, on_message_callback=None, **kwargs): + self.hub = hub + + # The active connection + self.conn = None + + # Callbacks + self.on_message_callback = on_message_callback + + # Perform custom initialization + self.init(*args, **kwargs) + + def init(self, *args, **kwargs): + pass + + @property + def url(self): + raise NotImplementedError + + async def connect(self): + """ + This will create a connection + """ + conn = await self.hub._socket(self.url, + on_message_callback=self.on_message_callback) + + # Disable Nagle's algorithm + if self.nodelay: + conn.set_nodelay(True) + + return conn + + async def reconnect(self): + """ + Tries to reconnect for forever + """ + attempts = 0 + + while True: + attempts += 1 + + log.debug("Trying to reconnect (attempt %s)..." % attempts) + + try: + self.conn = await self.connect() + + # The web service responded with some error + except tornado.httpclient.HTTPClientError as e: + log.error("%s: Received HTTP Error %s" % (self.url, e.code)) + + # If the service is down we will retry in 10 seconds + if e.code == 503: + await asyncio.sleep(10) + + # Raise any unhandled errors + else: + raise e + + # The web service did not respond in time + except tornado.simple_httpclient.HTTPTimeoutError as e: + await asyncio.sleep(30) + + # Raise all other exceptions + except Exception as e: + raise e + + # If the connection was established successfully, we return + else: + return + + async def write_message(self, message, **kwargs): + """ + Sends a message but encodes it into JSON first + """ + # This should never happen + if not self.conn: + raise RuntimeError("Not connected") + + if isinstance(message, dict): + message = tornado.escape.json_encode(message) + + try: + return await self.conn.write_message(message, **kwargs) - # Send request - await self._request("POST", "/builders/info", **data) + except tornado.websocket.WebSocketClosedError as e: + # Try to reconnect + await self.reconnect() - async def send_builder_stats(self): + # Try to send the message again + return await self.write_message(message, **kwargs) + + +class QueueProxy(HubObject): + url = "/api/v1/jobs/queue" + + +class BuilderStats(HubObject): + """ + Proxy for Builder Stats + """ + def init(self): + # Fetch processor information + self.cpu = cpuinfo.get_cpu_info() + + # Fetch the native architecture + self.native_arch = _pakfire.native_arch() + + @property + def url(self): + return "/api/v1/builders/stats" + + async def submit(self): log.debug("Sending stat message to hub...") # Fetch processor information - cpu = psutil.cpu_times_percent() + cpu_times = psutil.cpu_times_percent() # Fetch memory/swap information mem = psutil.virtual_memory() @@ -393,18 +554,27 @@ class Hub(object): # Fetch load average loadavg = psutil.getloadavg() - data = { - # CPU - "cpu_user" : cpu.user, - "cpu_nice" : cpu.nice, - "cpu_system" : cpu.system, - "cpu_idle" : cpu.idle, - "cpu_iowait" : cpu.iowait, - "cpu_irq" : cpu.irq, - "cpu_softirq" : cpu.softirq, - "cpu_steal" : cpu.steal, - "cpu_guest" : cpu.guest, - "cpu_guest_nice" : cpu.guest_nice, + await self.write_message({ + # CPU info + "cpu_model" : self.cpu.get("brand_raw"), + "cpu_count" : self.cpu.get("count"), + "cpu_arch" : self.native_arch, + + # Pakfire + OS + "pakfire_version" : PAKFIRE_VERSION, + "os_name" : util.get_distro_name(), + + # CPU Times + "cpu_user" : cpu_times.user, + "cpu_nice" : cpu_times.nice, + "cpu_system" : cpu_times.system, + "cpu_idle" : cpu_times.idle, + "cpu_iowait" : cpu_times.iowait, + "cpu_irq" : cpu_times.irq, + "cpu_softirq" : cpu_times.softirq, + "cpu_steal" : cpu_times.steal, + "cpu_guest" : cpu_times.guest, + "cpu_guest_nice" : cpu_times.guest_nice, # Load average "loadavg1" : loadavg[0], @@ -426,109 +596,59 @@ class Hub(object): "swap_total" : swap.total, "swap_used" : swap.used, "swap_free" : swap.free, - } - - # Send request - await self._request("POST", "/builders/stats", **data) - - async def queue(self, job_received_callback): - """ - Connects to the hub and asks for a build job - """ - on_message_callback = functools.partial( - self._decode_json_message, job_received_callback) - - # Join the queue - queue = await self._request("GET", "/queue", websocket=True, ping=10, - on_message_callback=on_message_callback) - - log.debug("Joined the queue") - - return queue - - @staticmethod - def _decode_json_message(callback, message): - """ - Takes a received message and decodes it. + }) - It will then call the callback with the decoded message. - """ - # Ignore empty messages - if message is None: - return - try: - message = json.loads(message) - except json.JSONDecodeError: - log.error("Could not decode JSON message:\n%s" % message) - return +class Job(HubObject): + """ + Proxy for Build Jobs + """ + def init(self, id): + self.id = id - return callback(message) + @property + def url(self): + return "/api/v1/jobs/%s" % self.id - async def job(self, id): + async def status(self, status): """ - Connects to the given job + Sends a new status to the hub """ - # Connect to the hub - conn = await self._request("GET", "/jobs/%s/builder" % id, - websocket=True, ping=10) - - # Return a Job proxy - return Job(self, id, conn) + await self.write_message({ + "message" : "status", + "status" : status, + }) - async def finish_job(self, job_id, success, packages=None, log=None): + async def finished(self, success, packages=None, logfile=None): """ Will tell the hub that a job has finished """ # Upload the log file - if log: - log = await self.upload(log, filename="%s.log" % job_id) + if logfile: + logfile = await self.hub.upload(logfile, filename="%s.log" % self.id) # Upload the packages if packages: - packages = await self.upload_multi(*packages) + packages = await self.hub.upload_multi(*packages) # Send the request - response = await self._request("POST", "/jobs/%s/finished" % job_id, - success=success, log=log, packages=packages) + await self.write_message({ + "message" : "finished", + "success" : success, + "logfile" : logfile, + "packages" : packages, + }) # Handle the response # XXX TODO - -class Job(object): - """ - Proxy for Build Jobs - """ - def __init__(self, hub, id, conn): - self.hub = hub - self.id = id - self.conn = conn - - async def _write_message(self, message, **kwargs): - """ - Sends a message but encodes it into JSON first - """ - if isinstance(message, dict): - message = tornado.escape.json_encode(message) - - return await self.conn.write_message(message, **kwargs) - - async def status(self, status): - """ - Sends a new status to the hub - """ - await self._write_message({ - "message" : "status", - "status" : status, - }) - - async def log(self, level, message): + async def log(self, timestamp, level, message): """ Sends a log message to the hub """ - await self._write_message({ - "message" : "log", - "level" : level, - "log" : message, + await self.write_message({ + "message" : "log", + "timestamp" : timestamp, + "level" : level, + "log" : message, }) -- 2.39.5