]> git.ipfire.org Git - pakfire.git/commitdiff
hub: Refactor communication with the hub after merging pakfire-web and -hub
authorMichael Tremer <michael.tremer@ipfire.org>
Tue, 25 Apr 2023 14:53:32 +0000 (14:53 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Wed, 26 Apr 2023 15:14:29 +0000 (15:14 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/pakfire/daemon.py
src/pakfire/hub.py

index f139e37079fbceb5984be8a37d7099e786c8c312..c2c267d6a30c6ca151071d7323f4331898674af9 100644 (file)
@@ -47,6 +47,9 @@ class Daemon(object):
                # List of worker processes.
                self.workers = []
 
+               # Stats Connection
+               self.stats = None
+
        def connect_to_hub(self):
                url = self.config.get("daemon", "server", PAKFIRE_HUB)
 
@@ -67,7 +70,7 @@ class Daemon(object):
                self._shutdown_signalled = asyncio.Event()
 
                # Send builder information
-               await self.hub.send_builder_info()
+               self.stats = await self.hub.builder_stats()
 
                # Join the job queue
                self.queue = await self.hub.queue(self.job_received)
@@ -82,7 +85,8 @@ class Daemon(object):
                                pass
 
                        # Send some information about this builder
-                       await self.hub.send_builder_stats()
+                       if self.stats:
+                               await self.stats.submit()
 
                # Main loop has ended, but we wait until all workers have finished.
                self.terminate_all_workers()
@@ -242,7 +246,7 @@ class Worker(multiprocessing.Process):
 
                # Create a temporary directory in which the built packages will be copied
                with tempfile.TemporaryDirectory(prefix="pakfire-packages-") as target:
-                       packages = None
+                       packages = []
 
                        # Run the build
                        try:
@@ -269,10 +273,9 @@ class Worker(multiprocessing.Process):
 
                        # Notify the hub that the job has finished
                        finally:
-                               await self.hub.finish_job(
-                                       job_id,
+                               await self.job.finished(
                                        success=success,
-                                       log=logger.logfile.name,
+                                       logfile=logger.logfile.name,
                                        packages=packages,
                                )
 
@@ -379,8 +382,6 @@ class BuildLogger(object):
                return self.logger.log(level, message)
 
        async def stream(self, timeout=0):
-               self.log.debug("Log streamer started")
-
                while True:
                        # Fetch a message from the queue
                        try:
index b334e096a57db6470213cb23c50c394e40a2af56..8545eb2a69e7039e857e3d0641ccb8fec1be13ba 100644 (file)
@@ -19,6 +19,7 @@
 #                                                                             #
 ###############################################################################
 
+import asyncio
 import cpuinfo
 import functools
 import hashlib
@@ -27,8 +28,11 @@ import kerberos
 import logging
 import os.path
 import psutil
+import subprocess
+import tempfile
 import tornado.escape
 import tornado.httpclient
+import tornado.simple_httpclient
 import tornado.websocket
 import urllib.parse
 
@@ -68,7 +72,19 @@ class Hub(object):
 
                # XXX support proxies
 
-       async def _request(self, method, path, websocket=False, authenticate=True,
+       async def _socket(self, path, **kwargs):
+               return await self._request("GET", path,
+
+                       # Enable websocket and ping once every ten seconds
+                       websocket=True,
+                       websocket_ping_interval=10,
+                       websocket_ping_timeout=60,
+
+                       **kwargs,
+               )
+
+       async def _request(self, method, path, websocket=False, websocket_ping_interval=None,
+                       websocket_ping_timeout=None, authenticate=True,
                        body=None, body_producer=None, on_message_callback=None, **kwargs):
                headers = {}
                query_args = {}
@@ -125,6 +141,8 @@ class Hub(object):
                if websocket:
                        return await tornado.websocket.websocket_connect(
                                req,
+                               ping_interval=websocket_ping_interval,
+                               ping_timeout=websocket_ping_timeout,
                                on_message_callback=on_message_callback,
                        )
 
@@ -164,6 +182,14 @@ class Hub(object):
                # Empty response
                return {}
 
+       async def _proxy(self, cls, *args, **kwargs):
+               conn = cls(self, *args, **kwargs)
+
+               # Create the initial connection
+               await conn.reconnect()
+
+               return conn
+
        def _setup_krb5_context(self, url):
                """
                        Creates the Kerberos context that can be used to perform client
@@ -203,7 +229,7 @@ class Hub(object):
        # Build actions
 
        def get_build(self, uuid):
-               return self._request("/builds/%s" % uuid, decode="json")
+               return self._request("/api/v1/builds/%s" % uuid, decode="json")
 
        async def build(self, path, repo=None, arches=None):
                """
@@ -215,7 +241,7 @@ class Hub(object):
                log.debug("%s has been uploaded as %s" % (path, upload_id))
 
                # Create a new build
-               build_id = await self._request("POST", "/builds",
+               build_id = await self._request("POST", "/api/v1/builds",
                        upload_id=upload_id, repo=repo, arches=arches)
 
                log.debug("Build creates as %s" % build_id)
@@ -238,7 +264,7 @@ class Hub(object):
                """
                        Returns a list of all uploads
                """
-               response = await self._request("GET", "/uploads")
+               response = await self._request("GET", "/api/v1/uploads")
 
                return response.get("uploads")
 
@@ -275,7 +301,7 @@ class Hub(object):
                body_producer = functools.partial(self._stream_file, path, size, p)
 
                # Perform upload
-               response = await self._request("PUT", "/uploads",
+               response = await self._request("PUT", "/api/v1/uploads",
                        body_producer=body_producer,
                        filename=filename, size=size, digest=digest
                )
@@ -284,7 +310,7 @@ class Hub(object):
                return response.get("id")
 
        async def delete_upload(self, upload_id):
-               await self._request("DELETE", "/uploads", id=upload_id)
+               await self._request("DELETE", "/api/v1/uploads/%s" % upload_id)
 
        async def upload_multi(self, *paths, show_progress=True):
                """
@@ -354,37 +380,172 @@ class Hub(object):
 
        # Builder
 
-       async def send_builder_info(self):
+       async def builder_stats(self):
                """
                        Sends information about this host to the hub.
 
                        This information is something that doesn't change during
                        the lifetime of the daemon.
                """
-               log.info(_("Sending builder information to hub..."))
+               return await self._proxy(BuilderStats)
 
-               # Fetch processor information
-               cpu = cpuinfo.get_cpu_info()
+       async def queue(self, job_received_callback):
+               """
+                       Connects to the hub and asks for a build job
+               """
+               on_message_callback = functools.partial(
+                       self._decode_json_message, job_received_callback,
+               )
 
-               data = {
-                       # CPU info
-                       "cpu_model"       : cpu.get("brand_raw"),
-                       "cpu_count"       : cpu.get("count"),
-                       "cpu_arch"        : _pakfire.native_arch(),
+               return await self._proxy(QueueProxy, on_message_callback=on_message_callback)
 
-                       # Pakfire + OS
-                       "pakfire_version" : PAKFIRE_VERSION,
-                       "os_name"         : util.get_distro_name(),
-               }
+       @staticmethod
+       def _decode_json_message(callback, message):
+               """
+                       Takes a received message and decodes it.
+
+                       It will then call the callback with the decoded message.
+               """
+               # Ignore empty messages
+               if message is None:
+                       return
+
+               try:
+                       message = json.loads(message)
+               except json.JSONDecodeError:
+                       log.error("Could not decode JSON message:\n%s" % message)
+                       return
+
+               return callback(message)
+
+       async def job(self, job_id):
+               """
+                       Connects to the given job
+               """
+               # Return a Job proxy
+               return await self._proxy(Job, job_id)
+
+
+class HubObject(object):
+       # Disable Nagle's algorithm?
+       nodelay = False
+
+       def __init__(self, hub, *args, on_message_callback=None, **kwargs):
+               self.hub = hub
+
+               # The active connection
+               self.conn = None
+
+               # Callbacks
+               self.on_message_callback = on_message_callback
+
+               # Perform custom initialization
+               self.init(*args, **kwargs)
+
+       def init(self, *args, **kwargs):
+               pass
+
+       @property
+       def url(self):
+               raise NotImplementedError
+
+       async def connect(self):
+               """
+                       This will create a connection
+               """
+               conn = await self.hub._socket(self.url,
+                       on_message_callback=self.on_message_callback)
+
+               # Disable Nagle's algorithm
+               if self.nodelay:
+                       conn.set_nodelay(True)
+
+               return conn
+
+       async def reconnect(self):
+               """
+                       Tries to reconnect for forever
+               """
+               attempts = 0
+
+               while True:
+                       attempts += 1
+
+                       log.debug("Trying to reconnect (attempt %s)..." % attempts)
+
+                       try:
+                               self.conn = await self.connect()
+
+                       # The web service responded with some error
+                       except tornado.httpclient.HTTPClientError as e:
+                               log.error("%s: Received HTTP Error %s" % (self.url, e.code))
+
+                               # If the service is down we will retry in 10 seconds
+                               if e.code == 503:
+                                       await asyncio.sleep(10)
+
+                               # Raise any unhandled errors
+                               else:
+                                       raise e
+
+                       # The web service did not respond in time
+                       except tornado.simple_httpclient.HTTPTimeoutError as e:
+                               await asyncio.sleep(30)
+
+                       # Raise all other exceptions
+                       except Exception as e:
+                               raise e
+
+                       # If the connection was established successfully, we return
+                       else:
+                               return
+
+       async def write_message(self, message, **kwargs):
+               """
+                       Sends a message but encodes it into JSON first
+               """
+               # This should never happen
+               if not self.conn:
+                       raise RuntimeError("Not connected")
+
+               if isinstance(message, dict):
+                       message = tornado.escape.json_encode(message)
+
+               try:
+                       return await self.conn.write_message(message, **kwargs)
 
-               # Send request
-               await self._request("POST", "/builders/info", **data)
+               except tornado.websocket.WebSocketClosedError as e:
+                       # Try to reconnect
+                       await self.reconnect()
 
-       async def send_builder_stats(self):
+                       # Try to send the message again
+                       return await self.write_message(message, **kwargs)
+
+
+class QueueProxy(HubObject):
+       url = "/api/v1/jobs/queue"
+
+
+class BuilderStats(HubObject):
+       """
+               Proxy for Builder Stats
+       """
+       def init(self):
+               # Fetch processor information
+               self.cpu = cpuinfo.get_cpu_info()
+
+               # Fetch the native architecture
+               self.native_arch = _pakfire.native_arch()
+
+       @property
+       def url(self):
+               return "/api/v1/builders/stats"
+
+       async def submit(self):
                log.debug("Sending stat message to hub...")
 
                # Fetch processor information
-               cpu = psutil.cpu_times_percent()
+               cpu_times = psutil.cpu_times_percent()
 
                # Fetch memory/swap information
                mem  = psutil.virtual_memory()
@@ -393,18 +554,27 @@ class Hub(object):
                # Fetch load average
                loadavg = psutil.getloadavg()
 
-               data = {
-                       # CPU
-                       "cpu_user"       : cpu.user,
-                       "cpu_nice"       : cpu.nice,
-                       "cpu_system"     : cpu.system,
-                       "cpu_idle"       : cpu.idle,
-                       "cpu_iowait"     : cpu.iowait,
-                       "cpu_irq"        : cpu.irq,
-                       "cpu_softirq"    : cpu.softirq,
-                       "cpu_steal"      : cpu.steal,
-                       "cpu_guest"      : cpu.guest,
-                       "cpu_guest_nice" : cpu.guest_nice,
+               await self.write_message({
+                       # CPU info
+                       "cpu_model"       : self.cpu.get("brand_raw"),
+                       "cpu_count"       : self.cpu.get("count"),
+                       "cpu_arch"        : self.native_arch,
+
+                       # Pakfire + OS
+                       "pakfire_version" : PAKFIRE_VERSION,
+                       "os_name"         : util.get_distro_name(),
+
+                       # CPU Times
+                       "cpu_user"       : cpu_times.user,
+                       "cpu_nice"       : cpu_times.nice,
+                       "cpu_system"     : cpu_times.system,
+                       "cpu_idle"       : cpu_times.idle,
+                       "cpu_iowait"     : cpu_times.iowait,
+                       "cpu_irq"        : cpu_times.irq,
+                       "cpu_softirq"    : cpu_times.softirq,
+                       "cpu_steal"      : cpu_times.steal,
+                       "cpu_guest"      : cpu_times.guest,
+                       "cpu_guest_nice" : cpu_times.guest_nice,
 
                        # Load average
                        "loadavg1"       : loadavg[0],
@@ -426,109 +596,59 @@ class Hub(object):
                        "swap_total"     : swap.total,
                        "swap_used"      : swap.used,
                        "swap_free"      : swap.free,
-               }
-
-               # Send request
-               await self._request("POST", "/builders/stats", **data)
-
-       async def queue(self, job_received_callback):
-               """
-                       Connects to the hub and asks for a build job
-               """
-               on_message_callback = functools.partial(
-                       self._decode_json_message, job_received_callback)
-
-               # Join the queue
-               queue = await self._request("GET", "/queue", websocket=True, ping=10,
-                       on_message_callback=on_message_callback)
-
-               log.debug("Joined the queue")
-
-               return queue
-
-       @staticmethod
-       def _decode_json_message(callback, message):
-               """
-                       Takes a received message and decodes it.
+               })
 
-                       It will then call the callback with the decoded message.
-               """
-               # Ignore empty messages
-               if message is None:
-                       return
 
-               try:
-                       message = json.loads(message)
-               except json.JSONDecodeError:
-                       log.error("Could not decode JSON message:\n%s" % message)
-                       return
+class Job(HubObject):
+       """
+               Proxy for Build Jobs
+       """
+       def init(self, id):
+               self.id = id
 
-               return callback(message)
+       @property
+       def url(self):
+               return "/api/v1/jobs/%s" % self.id
 
-       async def job(self, id):
+       async def status(self, status):
                """
-                       Connects to the given job
+                       Sends a new status to the hub
                """
-               # Connect to the hub
-               conn = await self._request("GET", "/jobs/%s/builder" % id,
-                       websocket=True, ping=10)
-
-               # Return a Job proxy
-               return Job(self, id, conn)
+               await self.write_message({
+                       "message" : "status",
+                       "status"  : status,
+               })
 
-       async def finish_job(self, job_id, success, packages=None, log=None):
+       async def finished(self, success, packages=None, logfile=None):
                """
                        Will tell the hub that a job has finished
                """
                # Upload the log file
-               if log:
-                       log = await self.upload(log, filename="%s.log" % job_id)
+               if logfile:
+                       logfile = await self.hub.upload(logfile, filename="%s.log" % self.id)
 
                # Upload the packages
                if packages:
-                       packages = await self.upload_multi(*packages)
+                       packages = await self.hub.upload_multi(*packages)
 
                # Send the request
-               response = await self._request("POST", "/jobs/%s/finished" % job_id,
-                       success=success, log=log, packages=packages)
+               await self.write_message({
+                       "message"  : "finished",
+                       "success"  : success,
+                       "logfile"  : logfile,
+                       "packages" : packages,
+               })
 
                # Handle the response
                # XXX TODO
 
-
-class Job(object):
-       """
-               Proxy for Build Jobs
-       """
-       def __init__(self, hub, id, conn):
-               self.hub  = hub
-               self.id   = id
-               self.conn = conn
-
-       async def _write_message(self, message, **kwargs):
-               """
-                       Sends a message but encodes it into JSON first
-               """
-               if isinstance(message, dict):
-                       message = tornado.escape.json_encode(message)
-
-               return await self.conn.write_message(message, **kwargs)
-
-       async def status(self, status):
-               """
-                       Sends a new status to the hub
-               """
-               await self._write_message({
-                       "message" : "status",
-                       "status"  : status,
-               })
-
-       async def log(self, level, message):
+       async def log(self, timestamp, level, message):
                """
                        Sends a log message to the hub
                """
-               await self._write_message({
-                       "message" : "log",
-                       "level"   : level,
-                       "log"     : message,
+               await self.write_message({
+                       "message"   : "log",
+                       "timestamp" : timestamp,
+                       "level"     : level,
+                       "log"       : message,
                })