From 2b6cc06d4e5b0ad7f029746f765f9d22c57a0c51 Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Thu, 5 Apr 2012 19:41:48 +0200 Subject: [PATCH] Add cgroup support. If the kernel of the host system has the ability to support cgroups, pakfire can make benefit from them. Build processes will be put into one big group. --- python/pakfire/builder.py | 63 ++++++-- python/pakfire/cgroup.py | 316 ++++++++++++++++++++++++++++++++++++++ python/pakfire/chroot.py | 8 +- 3 files changed, 368 insertions(+), 19 deletions(-) create mode 100644 python/pakfire/cgroup.py diff --git a/python/pakfire/builder.py b/python/pakfire/builder.py index 6008d66f7..6123b4d93 100644 --- a/python/pakfire/builder.py +++ b/python/pakfire/builder.py @@ -25,12 +25,14 @@ import math import os import re import shutil +import signal import socket import tempfile import time import uuid import base +import cgroup import chroot import logger import packages @@ -105,6 +107,14 @@ class BuildEnviron(object): # If no logile was given, we use the root logger. self.log = logging.getLogger("pakfire") + # Initialize a cgroup (if supported). + self.cgroup = None + if cgroup.supported(): + self.cgroup = cgroup.CGroup("pakfire/builder/%s" % self.build_id) + + # Attach the pakfire-builder process to the parent group. + self.cgroup.parent.attach() + # Log information about pakfire and some more information, when we # are running in release mode. if self.mode == "release": @@ -218,8 +228,24 @@ class BuildEnviron(object): self.extract() def stop(self): - # Kill all still running processes. - util.orphans_kill(self.path) + if self.cgroup: + # Kill all still running processes in the cgroup. + self.cgroup.kill_and_wait() + + # Remove cgroup and all parent cgroups if they are empty. + self.cgroup.migrate_task(self.cgroup.root, os.getpid()) + self.cgroup.destroy() + + parent = self.cgroup.parent + while parent: + if not parent.is_empty(recursive=True): + break + + parent.destroy() + parent = parent.parent + + else: + util.orphans_kill(self.path) # Close pakfire instance. del self.pakfire @@ -666,6 +692,9 @@ class BuildEnviron(object): if not kwargs.has_key("chrootPath"): kwargs["chrootPath"] = self.chrootPath() + if not kwargs.has_key("cgroup"): + kwargs["cgroup"] = self.cgroup + ret = chroot.do( command, personality=personality, @@ -943,7 +972,7 @@ class Builder(object): return environ - def do(self, command, shell=True, personality=None, cwd=None, *args, **kwargs): + def do(self, command, shell=True, *args, **kwargs): try: logger = kwargs["logger"] except KeyError: @@ -956,26 +985,28 @@ class Builder(object): log.debug(" %s=%s" % (k, v)) # Update personality it none was set - if not personality: - personality = self.distro.personality + if not kwargs.has_key("personality"): + kwargs["personality"] = self.distro.personality - if not cwd: - cwd = "/%s" % LOCAL_TMP_PATH + if not kwargs.has_key("cwd"): + kwargs["cwd"] = "/%s" % LOCAL_TMP_PATH # Make every shell to a login shell because we set a lot of # environment things there. if shell: command = ["bash", "--login", "-c", command] + kwargs["shell"] = False - return chroot.do( - command, - personality=personality, - shell=False, - env=self.environ, - cwd=cwd, - *args, - **kwargs - ) + kwargs["env"] = self.environ + + try: + return chroot.do(command, *args, **kwargs) + except Error: + if not logger: + logger = logging.getLogger("pakfire") + + logger.error("Command exited with an error: %s" % command) + raise def run_script(self, script, *args): if not script.startswith("/"): diff --git a/python/pakfire/cgroup.py b/python/pakfire/cgroup.py new file mode 100644 index 000000000..3a1aa25a5 --- /dev/null +++ b/python/pakfire/cgroup.py @@ -0,0 +1,316 @@ +#!/usr/bin/python + +import os +import shutil +import signal +import time + +import logging +log = logging.getLogger("pakfire.cgroups") + +CGROUP_PATH_CANDIDATES = ( + "/sys/fs/cgroup/systemd/system", + "/sys/fs/cgroup", +) + +def find_cgroup_path(): + """ + This function tries to find the right place + where to put the cgroups. + """ + for path in CGROUP_PATH_CANDIDATES: + check_path = os.path.join(path, "tasks") + if not os.path.exists(check_path): + continue + + return path + +CGROUP_PATH = find_cgroup_path() + +def supported(): + """ + Returns True or False depending on + whether cgroups are supported or not. + """ + if CGROUP_PATH is None: + return False + + return True + +class CGroup(object): + def __init__(self, name): + assert supported(), "cgroups are not supported by this kernel" + + self.name = name + self.path = os.path.join(CGROUP_PATH, name) + self.path = os.path.abspath(self.path) + + # The parent cgroup. + self._parent = None + + # Initialize the cgroup. + self.create() + + log.debug("cgroup '%s' has been successfully initialized." % self.name) + + def __repr__(self): + return "<%s %s>" % (self.__class__.__name__, self.name) + + def __cmp__(self, other): + return cmp(self.path, other.path) + + def create(self): + """ + Creates the filesystem structure for + the cgroup. + """ + if os.path.exists(self.path): + return + + log.debug("cgroup '%s' has been created." % self.name) + os.makedirs(self.path) + + def attach(self): + """ + Attaches this task to the cgroup. + """ + pid = os.getpid() + self.attach_task(pid) + + def destroy(self): + """ + Deletes the cgroup. + + All running tasks will be migrated to the parent cgroup. + """ + # Don't delete the root cgroup. + if self == self.root: + return + + # Move all tasks to the parent. + self.migrate(self.parent) + + # Just make sure the statement above worked. + assert self.is_empty(recursive=True), "cgroup must be empty to be destroyed" + assert not self.processes + + # Remove the file tree. + try: + os.rmdir(self.path) + except OSError, e: + # Ignore "Device or resource busy". + if e.errno == 16: + return + + raise + + def _read(self, file): + """ + Reads the contect of file in the cgroup directory. + """ + file = os.path.join(self.path, file) + + with open(file) as f: + return f.read() + + def _read_pids(self, file): + """ + Reads file and interprets the lines as a sorted list. + """ + _pids = self._read(file) + + pids = [] + + for pid in _pids.splitlines(): + try: + pid = int(pid) + except ValueError: + continue + + if pid in pids: + continue + + pids.append(pid) + + return sorted(pids) + + def _write(self, file, what): + """ + Writes what to file in the cgroup directory. + """ + file = os.path.join(self.path, file) + + f = open(file, "w") + f.write("%s" % what) + f.close() + + @property + def root(self): + if self.parent: + return self.parent.root + + return self + + @property + def parent(self): + # Cannot go above CGROUP_PATH. + if self.path == CGROUP_PATH: + return + + if self._parent is None: + parent_name = os.path.dirname(self.name) + self._parent = CGroup(parent_name) + + return self._parent + + @property + def subgroups(self): + subgroups = [] + + for name in os.listdir(self.path): + path = os.path.join(self.path, name) + if not os.path.isdir(path): + continue + + name = os.path.join(self.name, name) + group = CGroup(name) + + subgroups.append(group) + + return subgroups + + def is_empty(self, recursive=False): + """ + Returns True if the cgroup is empty. + + Otherwise returns False. + """ + if self.tasks: + return False + + if recursive: + for subgroup in self.subgroups: + if subgroup.is_empty(recursive=recursive): + continue + + return False + + return True + + @property + def tasks(self): + """ + Returns a list of pids of all tasks + in this process group. + """ + return self._read_pids("tasks") + + @property + def processes(self): + """ + Returns a list of pids of all processes + that are currently running within the cgroup. + """ + return self._read_pids("cgroup.procs") + + def attach_task(self, pid): + """ + Attaches the task with the given PID to + the cgroup. + """ + self._write("tasks", pid) + + def migrate_task(self, other, pid): + """ + Migrates a single task to another cgroup. + """ + other.attach_task(pid) + + def migrate(self, other): + if self.is_empty(recursive=True): + return + + log.info("Migrating all tasks from '%s' to '%s'." \ + % (self.name, other.name)) + + while True: + # Migrate all tasks to the new cgroup. + for task in self.tasks: + self.migrate_task(other, task) + + # Also do that for all subgroups. + for subgroup in self.subgroups: + subgroup.migrate(other) + + if self.is_empty(): + break + + def kill(self, sig=signal.SIGTERM, recursive=True): + killed_processes = [] + + mypid = os.getpid() + + while True: + for proc in self.processes: + # Don't kill myself. + if proc == mypid: + continue + + # Skip all processes that have already been killed. + if proc in killed_processes: + continue + + # If we haven't killed the process yet, we kill it. + log.debug("Sending signal %s to process %s..." % (sig, proc)) + + try: + os.kill(proc, sig) + except OSError, e: + raise + + # Save all killed processes to a list. + killed_processes.append(proc) + + else: + # If no processes are left to be killed, we end the loop. + break + + # Nothing more to do if not in recursive mode. + if not recursive: + return + + # Kill all processes in subgroups as well. + for subgroup in self.subgroups: + subgroup.kill(sig=sig, recursive=recursive) + + def kill_and_wait(self): + # Safely kill all processes in the cgroup. + # This first sends SIGTERM and then checks 8 times + # after 200ms whether the group is empty. If not, + # everything what's still in there gets SIGKILL + # and it is five more times checked if everything + # went away. + + sig = None + for i in range(15): + if i == 0: + sig = signal.SIGTERM + elif i == 9: + sig = signal.SIGKILL + else: + sig = None + + # If no signal is given and there are no processes + # left, our job is done and we can exit. + if not self.processes: + break + + if sig: + # Send sig to all processes in the cgroup. + log.info("Sending signal %s to all processes in '%s'." % (sig, self.name)) + self.kill(sig=sig, recursive=True) + + # Sleep for 200ms. + time.sleep(0.2) + + return self.is_empty() diff --git a/python/pakfire/chroot.py b/python/pakfire/chroot.py index c620fdb8c..521759b10 100644 --- a/python/pakfire/chroot.py +++ b/python/pakfire/chroot.py @@ -79,7 +79,7 @@ def logOutput(fds, logger, returnOutput=1, start=0, timeout=0): return output -def do(command, shell=False, chrootPath=None, cwd=None, timeout=0, raiseExc=True, returnOutput=0, personality=None, logger=None, env=None, *args, **kargs): +def do(command, shell=False, chrootPath=None, cwd=None, timeout=0, raiseExc=True, returnOutput=0, personality=None, logger=None, env=None, cgroup=None, *args, **kargs): # Save the output of command output = "" @@ -93,8 +93,6 @@ def do(command, shell=False, chrootPath=None, cwd=None, timeout=0, raiseExc=True logger.debug("Executing command: %s in %s" % (command, chrootPath or "/")) try: - child = None - # Create new child process child = subprocess.Popen( command, @@ -107,6 +105,10 @@ def do(command, shell=False, chrootPath=None, cwd=None, timeout=0, raiseExc=True env=env ) + # If cgroup is given, attach the subprocess. + if cgroup: + cgroup.attach_task(child.pid) + # use select() to poll for output so we dont block output = logOutput([child.stdout, child.stderr], logger, returnOutput, start, timeout) -- 2.39.5