]> git.ipfire.org Git - pakfire.git/commitdiff
daemon: Drop keepalive process
authorMichael Tremer <michael.tremer@ipfire.org>
Thu, 26 May 2022 11:58:23 +0000 (11:58 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Thu, 26 May 2022 11:58:23 +0000 (11:58 +0000)
The old companion process which sent keepalive messages to the hub has
been dropped and the main daemon process is now conducting this task.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
configure.ac
src/pakfire/daemon.py
src/pakfire/hub.py

index e75263b2f1b118d90294b8dc91dc6450f17a3277..9bb6a10b9d6014ddcae4605fe062fd18c16c7df5 100644 (file)
@@ -232,6 +232,7 @@ AC_CHECK_FUNCS([ \
 # Python Stuff
 AM_PATH_PYTHON([3.6])
 
+AX_PYTHON_MODULE([psutil], [fatal])
 AX_PYTHON_MODULE([systemd], [fatal])
 AX_PYTHON_MODULE([tornado], [fatal])
 
index 7647b363d6f9e75038ec703f3421e277ecb8c01c..74a3ef6daecb8f70c47fe4f2c2ff9de07f7840fa 100644 (file)
@@ -1,5 +1,6 @@
 #!/usr/bin/python3
 
+import asyncio
 import hashlib
 import json
 import multiprocessing
@@ -46,11 +47,8 @@ class Daemon(object):
                # Indicates if this daemon is in running mode.
                self.__running = True
 
-               # Create daemon that sends keep-alive messages.
-               self.keepalive = PakfireDaemonKeepalive(self.hub)
-
                # List of worker processes.
-               self.__workers = [self.keepalive]
+               self.__workers = []
 
                ### Configuration
                # Number of workers in waiting state.
@@ -72,33 +70,23 @@ class Daemon(object):
                # Create connection to the hub
                return hub.Hub(url, hostname, password)
 
-       def run(self, heartbeat=30):
+       async def run(self):
                """
                        Main loop.
                """
                # Register signal handlers.
                self.register_signal_handlers()
 
-               # Start keepalive process.
-               self.keepalive.start()
+               # Send builder information
+               await self.hub.send_builder_info()
 
-               # Run main loop.
+               # Run main loop
                while self.__running:
-                       time_started = time.time()
-
-                       # Check if keepalive process is still alive.
-                       if not self.keepalive.is_alive():
-                               self.restart_keepalive(wait=10)
-
-                       # Spawn a sufficient number of worker processes.
-                       self.spawn_workers_if_needed()
+                       # Send some information about this builder
+                       await self.hub.send_builder_stats()
 
-                       # Get runtime of this loop iteration.
-                       time_elapsed = time.time() - time_started
-
-                       # Wait if the heartbeat time has not been reached, yet.
-                       if time_elapsed < heartbeat:
-                               time.sleep(heartbeat - time_elapsed)
+                       # Sleep for 10 seconds
+                       await asyncio.sleep(10)
 
                # Main loop has ended, but we wait until all workers have finished.
                self.terminate_all_workers()
@@ -113,55 +101,6 @@ class Daemon(object):
                log.info(_("Shutting down..."))
                self.__running = False
 
-       def restart_keepalive(self, wait=None):
-               log.critical(_("Restarting keepalive process"))
-
-               # Send SIGTERM to really end the process.
-               self.keepalive.terminate()
-
-               # Wait for the process to terminate.
-               if wait:
-                       self.keepalive.join(wait)
-
-               # Remove the keepalive process from the process list.
-               try:
-                       self.__workers.remove(self.keepalive)
-               except ValueError:
-                       pass
-
-               # Create a new process and start it.
-               self.keepalive = PakfireDaemonKeepalive(self.hub)
-               self.keepalive.start()
-
-               # Add the process to the process list.
-               self.__workers.append(self.keepalive)
-
-       def spawn_workers_if_needed(self, *args, **kwargs):
-               """
-                       Spawns more workers if needed.
-               """
-               # Do not create any more processes when the daemon is shutting down.
-               if not self.__running:
-                       return
-
-               # Cleanup all other workers.
-               self.cleanup_workers()
-
-               # Do not create more workers if there are already enough workers
-               # active.
-               if len(self.workers) >= self.max_running:
-                       log.warning("More workers running than allowed")
-                       return
-
-               # Do nothing, if there is are already enough workers waiting.
-               wanted_waiting_workers = self.max_waiting - len(self.waiting_workers)
-               if wanted_waiting_workers <= 0:
-                       return
-
-               # Spawn a new worker.
-               for i in range(wanted_waiting_workers):
-                       self.spawn_worker(*args, **kwargs)
-
        def spawn_worker(self, *args, **kwargs):
                """
                        Spawns a new worker process.
@@ -185,12 +124,10 @@ class Daemon(object):
                        Terminates all workers.
                """
                # First send SIGTERM to all processes.
-               self.terminate_worker(self.keepalive)
                for worker in self.workers:
                        self.terminate_worker(worker)
 
                # Then wait until they all have finished.
-               self.keepalive.join()
                for worker in self.workers:
                        worker.join()
 
@@ -254,60 +191,7 @@ class Daemon(object):
                        Handle signal SIGCHLD.
                """
                # Spawn new workers if necessary.
-               self.spawn_workers_if_needed()
-
-       def handle_SIGTERM(self, signum, frame):
-               """
-                       Handle signal SIGTERM.
-               """
-               # Just shut down.
-               self.shutdown()
-
-
-class PakfireDaemonKeepalive(multiprocessing.Process):
-       def __init__(self, hub):
-               multiprocessing.Process.__init__(self)
-               self.hub = hub
-
-       def run(self, heartbeat=30):
-               # Register signal handlers.
-               self.register_signal_handlers()
-
-               # Send our profile to the hub.
-               self.send_builder_info()
-
-               while True:
-                       time_started = time.time()
-
-                       # Send keepalive message.
-                       self.send_keepalive()
-
-                       # Get runtime of this loop iteration.
-                       time_elapsed = time.time() - time_started
-
-                       # Wait if the heartbeat time has not been reached, yet.
-                       if time_elapsed < heartbeat:
-                               time.sleep(heartbeat - time_elapsed)
-
-       def shutdown(self):
-               """
-                       Ends this process immediately.
-               """
-               sys.exit(1)
-
-       # Signal handling.
-
-       def register_signal_handlers(self):
-               signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
-               signal.signal(signal.SIGINT,  self.handle_SIGTERM)
-               signal.signal(signal.SIGTERM, self.handle_SIGTERM)
-
-       def handle_SIGCHLD(self, signum, frame):
-               """
-                       Handle signal SIGCHLD.
-               """
-               # Must be here so that SIGCHLD won't be propagated to
-               # PakfireDaemon.
+               #self.spawn_workers_if_needed()
                pass
 
        def handle_SIGTERM(self, signum, frame):
@@ -317,48 +201,6 @@ class PakfireDaemonKeepalive(multiprocessing.Process):
                # Just shut down.
                self.shutdown()
 
-       # Talking to the hub.
-
-       def send_builder_info(self):
-               log.info(_("Sending builder information to hub..."))
-
-               data = {
-                       # CPU info
-                       "cpu_model"       : system.cpu_model,
-                       "cpu_count"       : system.cpu_count,
-                       "cpu_arch"        : _pakfire.native_arch(),
-                       "cpu_bogomips"    : system.cpu_bogomips,
-
-                       # Memory + swap
-                       "mem_total"       : system.memory,
-                       "swap_total"      : system.swap_total,
-
-                       # Pakfire + OS
-                       "pakfire_version" : PAKFIRE_VERSION,
-                       "os_name"         : system.distro.pretty_name,
-               }
-               self.hub._request("/builders/info", method="POST", data=data)
-
-       def send_keepalive(self):
-               log.debug("Sending keepalive message to hub...")
-
-               data = {
-                       # Load average.
-                       "loadavg1"   : system.loadavg1,
-                       "loadavg5"   : system.loadavg5,
-                       "loadavg15"  : system.loadavg15,
-
-                       # Memory
-                       "mem_total"  : system.memory_total,
-                       "mem_free"   : system.memory_free,
-
-                       # Swap
-                       "swap_total" : system.swap_total,
-                       "swap_free"  : system.swap_free,
-               }
-
-               self.hub._request("/builders/keepalive", method="POST", data=data)
-
 
 class PakfireWorker(multiprocessing.Process):
        def __init__(self, hub, waiting=None):
index cccd532a5c2c4ab47044209a4c741a9da8933c1e..d8108e4f768d19316e8f347295a75f36c313c7df 100644 (file)
 #                                                                             #
 ###############################################################################
 
+import cpuinfo
 import functools
 import hashlib
 import json
 import logging
 import os.path
+import psutil
 import tornado.httpclient
 import urllib.parse
 
 from . import _pakfire
+from .distro import Distribution
 from .constants import *
 from .i18n import _
 
@@ -219,3 +222,85 @@ class Hub(object):
                                h.update(buf)
 
                return "%s:%s" % (algo, h.hexdigest())
+
+       # Builder
+
+       async def send_builder_info(self):
+               """
+                       Sends information about this host to the hub.
+
+                       This information is something that doesn't change during
+                       the lifetime of the daemon.
+               """
+               log.info(_("Sending builder information to hub..."))
+
+               # Fetch distribution information
+               distro = Distribution()
+
+               # Fetch processor information
+               cpu = cpuinfo.get_cpu_info()
+
+               data = {
+                       # CPU info
+                       "cpu_model"       : cpu.get("brand_raw"),
+                       "cpu_count"       : cpu.get("count"),
+                       "cpu_arch"        : _pakfire.native_arch(),
+
+                       # Pakfire + OS
+                       "pakfire_version" : PAKFIRE_VERSION,
+                       "os_name"         : distro.pretty_name,
+               }
+
+               # Send request
+               await self._request("POST", "/builders/info", **data)
+
+       async def send_builder_stats(self):
+               log.debug("Sending stat message to hub...")
+
+               # Fetch processor information
+               cpu = psutil.cpu_times_percent()
+
+               # Fetch memory/swap information
+               mem  = psutil.virtual_memory()
+               swap = psutil.swap_memory()
+
+               # Fetch load average
+               loadavg = psutil.getloadavg()
+
+               data = {
+                       # CPU
+                       "cpu_user"       : cpu.user,
+                       "cpu_nice"       : cpu.nice,
+                       "cpu_system"     : cpu.system,
+                       "cpu_idle"       : cpu.idle,
+                       "cpu_iowait"     : cpu.iowait,
+                       "cpu_irq"        : cpu.irq,
+                       "cpu_softirq"    : cpu.softirq,
+                       "cpu_steal"      : cpu.steal,
+                       "cpu_guest"      : cpu.guest,
+                       "cpu_guest_nice" : cpu.guest_nice,
+
+                       # Load average
+                       "loadavg1"       : loadavg[0],
+                       "loadavg5"       : loadavg[1],
+                       "loadavg15"      : loadavg[2],
+
+                       # Memory
+                       "mem_total"      : mem.total,
+                       "mem_available"  : mem.available,
+                       "mem_used"       : mem.used,
+                       "mem_free"       : mem.free,
+                       "mem_active"     : mem.active,
+                       "mem_inactive"   : mem.inactive,
+                       "mem_buffers"    : mem.buffers,
+                       "mem_cached"     : mem.cached,
+                       "mem_shared"     : mem.shared,
+
+                       # Swap
+                       "swap_total"     : swap.total,
+                       "swap_used"      : swap.used,
+                       "swap_free"      : swap.free,
+               }
+
+               # Send request
+               await self._request("POST", "/builders/stats", **data)