From da170b3619adfb934215abdab084e031cda98c82 Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Thu, 26 May 2022 11:58:23 +0000 Subject: [PATCH] daemon: Drop keepalive process The old companion process which sent keepalive messages to the hub has been dropped and the main daemon process is now conducting this task. Signed-off-by: Michael Tremer --- configure.ac | 1 + src/pakfire/daemon.py | 180 +++--------------------------------------- src/pakfire/hub.py | 85 ++++++++++++++++++++ 3 files changed, 97 insertions(+), 169 deletions(-) diff --git a/configure.ac b/configure.ac index e75263b2f..9bb6a10b9 100644 --- a/configure.ac +++ b/configure.ac @@ -232,6 +232,7 @@ AC_CHECK_FUNCS([ \ # Python Stuff AM_PATH_PYTHON([3.6]) +AX_PYTHON_MODULE([psutil], [fatal]) AX_PYTHON_MODULE([systemd], [fatal]) AX_PYTHON_MODULE([tornado], [fatal]) diff --git a/src/pakfire/daemon.py b/src/pakfire/daemon.py index 7647b363d..74a3ef6da 100644 --- a/src/pakfire/daemon.py +++ b/src/pakfire/daemon.py @@ -1,5 +1,6 @@ #!/usr/bin/python3 +import asyncio import hashlib import json import multiprocessing @@ -46,11 +47,8 @@ class Daemon(object): # Indicates if this daemon is in running mode. self.__running = True - # Create daemon that sends keep-alive messages. - self.keepalive = PakfireDaemonKeepalive(self.hub) - # List of worker processes. - self.__workers = [self.keepalive] + self.__workers = [] ### Configuration # Number of workers in waiting state. @@ -72,33 +70,23 @@ class Daemon(object): # Create connection to the hub return hub.Hub(url, hostname, password) - def run(self, heartbeat=30): + async def run(self): """ Main loop. """ # Register signal handlers. self.register_signal_handlers() - # Start keepalive process. - self.keepalive.start() + # Send builder information + await self.hub.send_builder_info() - # Run main loop. + # Run main loop while self.__running: - time_started = time.time() - - # Check if keepalive process is still alive. - if not self.keepalive.is_alive(): - self.restart_keepalive(wait=10) - - # Spawn a sufficient number of worker processes. - self.spawn_workers_if_needed() + # Send some information about this builder + await self.hub.send_builder_stats() - # Get runtime of this loop iteration. - time_elapsed = time.time() - time_started - - # Wait if the heartbeat time has not been reached, yet. - if time_elapsed < heartbeat: - time.sleep(heartbeat - time_elapsed) + # Sleep for 10 seconds + await asyncio.sleep(10) # Main loop has ended, but we wait until all workers have finished. self.terminate_all_workers() @@ -113,55 +101,6 @@ class Daemon(object): log.info(_("Shutting down...")) self.__running = False - def restart_keepalive(self, wait=None): - log.critical(_("Restarting keepalive process")) - - # Send SIGTERM to really end the process. - self.keepalive.terminate() - - # Wait for the process to terminate. - if wait: - self.keepalive.join(wait) - - # Remove the keepalive process from the process list. - try: - self.__workers.remove(self.keepalive) - except ValueError: - pass - - # Create a new process and start it. - self.keepalive = PakfireDaemonKeepalive(self.hub) - self.keepalive.start() - - # Add the process to the process list. - self.__workers.append(self.keepalive) - - def spawn_workers_if_needed(self, *args, **kwargs): - """ - Spawns more workers if needed. - """ - # Do not create any more processes when the daemon is shutting down. - if not self.__running: - return - - # Cleanup all other workers. - self.cleanup_workers() - - # Do not create more workers if there are already enough workers - # active. - if len(self.workers) >= self.max_running: - log.warning("More workers running than allowed") - return - - # Do nothing, if there is are already enough workers waiting. - wanted_waiting_workers = self.max_waiting - len(self.waiting_workers) - if wanted_waiting_workers <= 0: - return - - # Spawn a new worker. - for i in range(wanted_waiting_workers): - self.spawn_worker(*args, **kwargs) - def spawn_worker(self, *args, **kwargs): """ Spawns a new worker process. @@ -185,12 +124,10 @@ class Daemon(object): Terminates all workers. """ # First send SIGTERM to all processes. - self.terminate_worker(self.keepalive) for worker in self.workers: self.terminate_worker(worker) # Then wait until they all have finished. - self.keepalive.join() for worker in self.workers: worker.join() @@ -254,60 +191,7 @@ class Daemon(object): Handle signal SIGCHLD. """ # Spawn new workers if necessary. - self.spawn_workers_if_needed() - - def handle_SIGTERM(self, signum, frame): - """ - Handle signal SIGTERM. - """ - # Just shut down. - self.shutdown() - - -class PakfireDaemonKeepalive(multiprocessing.Process): - def __init__(self, hub): - multiprocessing.Process.__init__(self) - self.hub = hub - - def run(self, heartbeat=30): - # Register signal handlers. - self.register_signal_handlers() - - # Send our profile to the hub. - self.send_builder_info() - - while True: - time_started = time.time() - - # Send keepalive message. - self.send_keepalive() - - # Get runtime of this loop iteration. - time_elapsed = time.time() - time_started - - # Wait if the heartbeat time has not been reached, yet. - if time_elapsed < heartbeat: - time.sleep(heartbeat - time_elapsed) - - def shutdown(self): - """ - Ends this process immediately. - """ - sys.exit(1) - - # Signal handling. - - def register_signal_handlers(self): - signal.signal(signal.SIGCHLD, self.handle_SIGCHLD) - signal.signal(signal.SIGINT, self.handle_SIGTERM) - signal.signal(signal.SIGTERM, self.handle_SIGTERM) - - def handle_SIGCHLD(self, signum, frame): - """ - Handle signal SIGCHLD. - """ - # Must be here so that SIGCHLD won't be propagated to - # PakfireDaemon. + #self.spawn_workers_if_needed() pass def handle_SIGTERM(self, signum, frame): @@ -317,48 +201,6 @@ class PakfireDaemonKeepalive(multiprocessing.Process): # Just shut down. self.shutdown() - # Talking to the hub. - - def send_builder_info(self): - log.info(_("Sending builder information to hub...")) - - data = { - # CPU info - "cpu_model" : system.cpu_model, - "cpu_count" : system.cpu_count, - "cpu_arch" : _pakfire.native_arch(), - "cpu_bogomips" : system.cpu_bogomips, - - # Memory + swap - "mem_total" : system.memory, - "swap_total" : system.swap_total, - - # Pakfire + OS - "pakfire_version" : PAKFIRE_VERSION, - "os_name" : system.distro.pretty_name, - } - self.hub._request("/builders/info", method="POST", data=data) - - def send_keepalive(self): - log.debug("Sending keepalive message to hub...") - - data = { - # Load average. - "loadavg1" : system.loadavg1, - "loadavg5" : system.loadavg5, - "loadavg15" : system.loadavg15, - - # Memory - "mem_total" : system.memory_total, - "mem_free" : system.memory_free, - - # Swap - "swap_total" : system.swap_total, - "swap_free" : system.swap_free, - } - - self.hub._request("/builders/keepalive", method="POST", data=data) - class PakfireWorker(multiprocessing.Process): def __init__(self, hub, waiting=None): diff --git a/src/pakfire/hub.py b/src/pakfire/hub.py index cccd532a5..d8108e4f7 100644 --- a/src/pakfire/hub.py +++ b/src/pakfire/hub.py @@ -19,15 +19,18 @@ # # ############################################################################### +import cpuinfo import functools import hashlib import json import logging import os.path +import psutil import tornado.httpclient import urllib.parse from . import _pakfire +from .distro import Distribution from .constants import * from .i18n import _ @@ -219,3 +222,85 @@ class Hub(object): h.update(buf) return "%s:%s" % (algo, h.hexdigest()) + + # Builder + + async def send_builder_info(self): + """ + Sends information about this host to the hub. + + This information is something that doesn't change during + the lifetime of the daemon. + """ + log.info(_("Sending builder information to hub...")) + + # Fetch distribution information + distro = Distribution() + + # Fetch processor information + cpu = cpuinfo.get_cpu_info() + + data = { + # CPU info + "cpu_model" : cpu.get("brand_raw"), + "cpu_count" : cpu.get("count"), + "cpu_arch" : _pakfire.native_arch(), + + # Pakfire + OS + "pakfire_version" : PAKFIRE_VERSION, + "os_name" : distro.pretty_name, + } + + # Send request + await self._request("POST", "/builders/info", **data) + + async def send_builder_stats(self): + log.debug("Sending stat message to hub...") + + # Fetch processor information + cpu = psutil.cpu_times_percent() + + # Fetch memory/swap information + mem = psutil.virtual_memory() + swap = psutil.swap_memory() + + # Fetch load average + loadavg = psutil.getloadavg() + + data = { + # CPU + "cpu_user" : cpu.user, + "cpu_nice" : cpu.nice, + "cpu_system" : cpu.system, + "cpu_idle" : cpu.idle, + "cpu_iowait" : cpu.iowait, + "cpu_irq" : cpu.irq, + "cpu_softirq" : cpu.softirq, + "cpu_steal" : cpu.steal, + "cpu_guest" : cpu.guest, + "cpu_guest_nice" : cpu.guest_nice, + + # Load average + "loadavg1" : loadavg[0], + "loadavg5" : loadavg[1], + "loadavg15" : loadavg[2], + + # Memory + "mem_total" : mem.total, + "mem_available" : mem.available, + "mem_used" : mem.used, + "mem_free" : mem.free, + "mem_active" : mem.active, + "mem_inactive" : mem.inactive, + "mem_buffers" : mem.buffers, + "mem_cached" : mem.cached, + "mem_shared" : mem.shared, + + # Swap + "swap_total" : swap.total, + "swap_used" : swap.used, + "swap_free" : swap.free, + } + + # Send request + await self._request("POST", "/builders/stats", **data) -- 2.47.3