#!/usr/bin/python3
+import asyncio
import hashlib
import json
import multiprocessing
# Indicates if this daemon is in running mode.
self.__running = True
- # Create daemon that sends keep-alive messages.
- self.keepalive = PakfireDaemonKeepalive(self.hub)
-
# List of worker processes.
- self.__workers = [self.keepalive]
+ self.__workers = []
### Configuration
# Number of workers in waiting state.
# Create connection to the hub
return hub.Hub(url, hostname, password)
- def run(self, heartbeat=30):
+ async def run(self):
"""
Main loop.
"""
# Register signal handlers.
self.register_signal_handlers()
- # Start keepalive process.
- self.keepalive.start()
+ # Send builder information
+ await self.hub.send_builder_info()
- # Run main loop.
+ # Run main loop
while self.__running:
- time_started = time.time()
-
- # Check if keepalive process is still alive.
- if not self.keepalive.is_alive():
- self.restart_keepalive(wait=10)
-
- # Spawn a sufficient number of worker processes.
- self.spawn_workers_if_needed()
+ # Send some information about this builder
+ await self.hub.send_builder_stats()
- # Get runtime of this loop iteration.
- time_elapsed = time.time() - time_started
-
- # Wait if the heartbeat time has not been reached, yet.
- if time_elapsed < heartbeat:
- time.sleep(heartbeat - time_elapsed)
+ # Sleep for 10 seconds
+ await asyncio.sleep(10)
# Main loop has ended, but we wait until all workers have finished.
self.terminate_all_workers()
log.info(_("Shutting down..."))
self.__running = False
- def restart_keepalive(self, wait=None):
- log.critical(_("Restarting keepalive process"))
-
- # Send SIGTERM to really end the process.
- self.keepalive.terminate()
-
- # Wait for the process to terminate.
- if wait:
- self.keepalive.join(wait)
-
- # Remove the keepalive process from the process list.
- try:
- self.__workers.remove(self.keepalive)
- except ValueError:
- pass
-
- # Create a new process and start it.
- self.keepalive = PakfireDaemonKeepalive(self.hub)
- self.keepalive.start()
-
- # Add the process to the process list.
- self.__workers.append(self.keepalive)
-
- def spawn_workers_if_needed(self, *args, **kwargs):
- """
- Spawns more workers if needed.
- """
- # Do not create any more processes when the daemon is shutting down.
- if not self.__running:
- return
-
- # Cleanup all other workers.
- self.cleanup_workers()
-
- # Do not create more workers if there are already enough workers
- # active.
- if len(self.workers) >= self.max_running:
- log.warning("More workers running than allowed")
- return
-
- # Do nothing, if there is are already enough workers waiting.
- wanted_waiting_workers = self.max_waiting - len(self.waiting_workers)
- if wanted_waiting_workers <= 0:
- return
-
- # Spawn a new worker.
- for i in range(wanted_waiting_workers):
- self.spawn_worker(*args, **kwargs)
-
def spawn_worker(self, *args, **kwargs):
"""
Spawns a new worker process.
Terminates all workers.
"""
# First send SIGTERM to all processes.
- self.terminate_worker(self.keepalive)
for worker in self.workers:
self.terminate_worker(worker)
# Then wait until they all have finished.
- self.keepalive.join()
for worker in self.workers:
worker.join()
Handle signal SIGCHLD.
"""
# Spawn new workers if necessary.
- self.spawn_workers_if_needed()
-
- def handle_SIGTERM(self, signum, frame):
- """
- Handle signal SIGTERM.
- """
- # Just shut down.
- self.shutdown()
-
-
-class PakfireDaemonKeepalive(multiprocessing.Process):
- def __init__(self, hub):
- multiprocessing.Process.__init__(self)
- self.hub = hub
-
- def run(self, heartbeat=30):
- # Register signal handlers.
- self.register_signal_handlers()
-
- # Send our profile to the hub.
- self.send_builder_info()
-
- while True:
- time_started = time.time()
-
- # Send keepalive message.
- self.send_keepalive()
-
- # Get runtime of this loop iteration.
- time_elapsed = time.time() - time_started
-
- # Wait if the heartbeat time has not been reached, yet.
- if time_elapsed < heartbeat:
- time.sleep(heartbeat - time_elapsed)
-
- def shutdown(self):
- """
- Ends this process immediately.
- """
- sys.exit(1)
-
- # Signal handling.
-
- def register_signal_handlers(self):
- signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
- signal.signal(signal.SIGINT, self.handle_SIGTERM)
- signal.signal(signal.SIGTERM, self.handle_SIGTERM)
-
- def handle_SIGCHLD(self, signum, frame):
- """
- Handle signal SIGCHLD.
- """
- # Must be here so that SIGCHLD won't be propagated to
- # PakfireDaemon.
+ #self.spawn_workers_if_needed()
pass
def handle_SIGTERM(self, signum, frame):
# Just shut down.
self.shutdown()
- # Talking to the hub.
-
- def send_builder_info(self):
- log.info(_("Sending builder information to hub..."))
-
- data = {
- # CPU info
- "cpu_model" : system.cpu_model,
- "cpu_count" : system.cpu_count,
- "cpu_arch" : _pakfire.native_arch(),
- "cpu_bogomips" : system.cpu_bogomips,
-
- # Memory + swap
- "mem_total" : system.memory,
- "swap_total" : system.swap_total,
-
- # Pakfire + OS
- "pakfire_version" : PAKFIRE_VERSION,
- "os_name" : system.distro.pretty_name,
- }
- self.hub._request("/builders/info", method="POST", data=data)
-
- def send_keepalive(self):
- log.debug("Sending keepalive message to hub...")
-
- data = {
- # Load average.
- "loadavg1" : system.loadavg1,
- "loadavg5" : system.loadavg5,
- "loadavg15" : system.loadavg15,
-
- # Memory
- "mem_total" : system.memory_total,
- "mem_free" : system.memory_free,
-
- # Swap
- "swap_total" : system.swap_total,
- "swap_free" : system.swap_free,
- }
-
- self.hub._request("/builders/keepalive", method="POST", data=data)
-
class PakfireWorker(multiprocessing.Process):
def __init__(self, hub, waiting=None):
# #
###############################################################################
+import cpuinfo
import functools
import hashlib
import json
import logging
import os.path
+import psutil
import tornado.httpclient
import urllib.parse
from . import _pakfire
+from .distro import Distribution
from .constants import *
from .i18n import _
h.update(buf)
return "%s:%s" % (algo, h.hexdigest())
+
+ # Builder
+
+ async def send_builder_info(self):
+ """
+ Sends information about this host to the hub.
+
+ This information is something that doesn't change during
+ the lifetime of the daemon.
+ """
+ log.info(_("Sending builder information to hub..."))
+
+ # Fetch distribution information
+ distro = Distribution()
+
+ # Fetch processor information
+ cpu = cpuinfo.get_cpu_info()
+
+ data = {
+ # CPU info
+ "cpu_model" : cpu.get("brand_raw"),
+ "cpu_count" : cpu.get("count"),
+ "cpu_arch" : _pakfire.native_arch(),
+
+ # Pakfire + OS
+ "pakfire_version" : PAKFIRE_VERSION,
+ "os_name" : distro.pretty_name,
+ }
+
+ # Send request
+ await self._request("POST", "/builders/info", **data)
+
+ async def send_builder_stats(self):
+ log.debug("Sending stat message to hub...")
+
+ # Fetch processor information
+ cpu = psutil.cpu_times_percent()
+
+ # Fetch memory/swap information
+ mem = psutil.virtual_memory()
+ swap = psutil.swap_memory()
+
+ # Fetch load average
+ loadavg = psutil.getloadavg()
+
+ data = {
+ # CPU
+ "cpu_user" : cpu.user,
+ "cpu_nice" : cpu.nice,
+ "cpu_system" : cpu.system,
+ "cpu_idle" : cpu.idle,
+ "cpu_iowait" : cpu.iowait,
+ "cpu_irq" : cpu.irq,
+ "cpu_softirq" : cpu.softirq,
+ "cpu_steal" : cpu.steal,
+ "cpu_guest" : cpu.guest,
+ "cpu_guest_nice" : cpu.guest_nice,
+
+ # Load average
+ "loadavg1" : loadavg[0],
+ "loadavg5" : loadavg[1],
+ "loadavg15" : loadavg[2],
+
+ # Memory
+ "mem_total" : mem.total,
+ "mem_available" : mem.available,
+ "mem_used" : mem.used,
+ "mem_free" : mem.free,
+ "mem_active" : mem.active,
+ "mem_inactive" : mem.inactive,
+ "mem_buffers" : mem.buffers,
+ "mem_cached" : mem.cached,
+ "mem_shared" : mem.shared,
+
+ # Swap
+ "swap_total" : swap.total,
+ "swap_used" : swap.used,
+ "swap_free" : swap.free,
+ }
+
+ # Send request
+ await self._request("POST", "/builders/stats", **data)