]> git.ipfire.org Git - pakfire.git/commitdiff
Add cgroup support.
authorMichael Tremer <michael.tremer@ipfire.org>
Thu, 5 Apr 2012 17:41:48 +0000 (19:41 +0200)
committerMichael Tremer <michael.tremer@ipfire.org>
Thu, 5 Apr 2012 17:41:48 +0000 (19:41 +0200)
If the kernel of the host system has the ability to support
cgroups, pakfire can make benefit from them.

Build processes will be put into one big group.

python/pakfire/builder.py
python/pakfire/cgroup.py [new file with mode: 0644]
python/pakfire/chroot.py

index 6008d66f7df49f121d3af950dfc9d4b5aee46854..6123b4d9365d914913e6a0c098fbcc61fc8050ce 100644 (file)
@@ -25,12 +25,14 @@ import math
 import os
 import re
 import shutil
+import signal
 import socket
 import tempfile
 import time
 import uuid
 
 import base
+import cgroup
 import chroot
 import logger
 import packages
@@ -105,6 +107,14 @@ class BuildEnviron(object):
                        # If no logile was given, we use the root logger.
                        self.log = logging.getLogger("pakfire")
 
+               # Initialize a cgroup (if supported).
+               self.cgroup = None
+               if cgroup.supported():
+                       self.cgroup = cgroup.CGroup("pakfire/builder/%s" % self.build_id)
+
+                       # Attach the pakfire-builder process to the parent group.
+                       self.cgroup.parent.attach()
+
                # Log information about pakfire and some more information, when we
                # are running in release mode.
                if self.mode == "release":
@@ -218,8 +228,24 @@ class BuildEnviron(object):
                self.extract()
 
        def stop(self):
-               # Kill all still running processes.
-               util.orphans_kill(self.path)
+               if self.cgroup:
+                       # Kill all still running processes in the cgroup.
+                       self.cgroup.kill_and_wait()
+
+                       # Remove cgroup and all parent cgroups if they are empty.
+                       self.cgroup.migrate_task(self.cgroup.root, os.getpid())
+                       self.cgroup.destroy()
+
+                       parent = self.cgroup.parent
+                       while parent:
+                               if not parent.is_empty(recursive=True):
+                                       break
+
+                               parent.destroy()
+                               parent = parent.parent
+
+               else:
+                       util.orphans_kill(self.path)
 
                # Close pakfire instance.
                del self.pakfire
@@ -666,6 +692,9 @@ class BuildEnviron(object):
                if not kwargs.has_key("chrootPath"):
                        kwargs["chrootPath"] = self.chrootPath()
 
+               if not kwargs.has_key("cgroup"):
+                       kwargs["cgroup"] = self.cgroup
+
                ret = chroot.do(
                        command,
                        personality=personality,
@@ -943,7 +972,7 @@ class Builder(object):
 
                return environ
 
-       def do(self, command, shell=True, personality=None, cwd=None, *args, **kwargs):
+       def do(self, command, shell=True, *args, **kwargs):
                try:
                        logger = kwargs["logger"]
                except KeyError:
@@ -956,26 +985,28 @@ class Builder(object):
                        log.debug("  %s=%s" % (k, v))
 
                # Update personality it none was set
-               if not personality:
-                       personality = self.distro.personality
+               if not kwargs.has_key("personality"):
+                       kwargs["personality"] = self.distro.personality
 
-               if not cwd:
-                       cwd = "/%s" % LOCAL_TMP_PATH
+               if not kwargs.has_key("cwd"):
+                       kwargs["cwd"] = "/%s" % LOCAL_TMP_PATH
 
                # Make every shell to a login shell because we set a lot of
                # environment things there.
                if shell:
                        command = ["bash", "--login", "-c", command]
+                       kwargs["shell"] = False
 
-               return chroot.do(
-                       command,
-                       personality=personality,
-                       shell=False,
-                       env=self.environ,
-                       cwd=cwd,
-                       *args,
-                       **kwargs
-               )
+               kwargs["env"] = self.environ
+
+               try:
+                       return chroot.do(command, *args, **kwargs)
+               except Error:
+                       if not logger:
+                               logger = logging.getLogger("pakfire")
+
+                       logger.error("Command exited with an error: %s" % command)
+                       raise
 
        def run_script(self, script, *args):
                if not script.startswith("/"):
diff --git a/python/pakfire/cgroup.py b/python/pakfire/cgroup.py
new file mode 100644 (file)
index 0000000..3a1aa25
--- /dev/null
@@ -0,0 +1,316 @@
+#!/usr/bin/python
+
+import os
+import shutil
+import signal
+import time
+
+import logging
+log = logging.getLogger("pakfire.cgroups")
+
+CGROUP_PATH_CANDIDATES = (
+       "/sys/fs/cgroup/systemd/system",
+       "/sys/fs/cgroup",
+)
+
+def find_cgroup_path():
+       """
+               This function tries to find the right place
+               where to put the cgroups.
+       """
+       for path in CGROUP_PATH_CANDIDATES:
+               check_path = os.path.join(path, "tasks")
+               if not os.path.exists(check_path):
+                       continue
+
+               return path
+
+CGROUP_PATH = find_cgroup_path()
+
+def supported():
+       """
+               Returns True or False depending on
+               whether cgroups are supported or not.
+       """
+       if CGROUP_PATH is None:
+               return False
+
+       return True
+
+class CGroup(object):
+       def __init__(self, name):
+               assert supported(), "cgroups are not supported by this kernel"
+
+               self.name = name
+               self.path = os.path.join(CGROUP_PATH, name)
+               self.path = os.path.abspath(self.path)
+
+               # The parent cgroup.
+               self._parent = None
+
+               # Initialize the cgroup.
+               self.create()
+
+               log.debug("cgroup '%s' has been successfully initialized." % self.name)
+
+       def __repr__(self):
+               return "<%s %s>" % (self.__class__.__name__, self.name)
+
+       def __cmp__(self, other):
+               return cmp(self.path, other.path)
+
+       def create(self):
+               """
+                       Creates the filesystem structure for
+                       the cgroup.
+               """
+               if os.path.exists(self.path):
+                       return
+
+               log.debug("cgroup '%s' has been created." % self.name)
+               os.makedirs(self.path)
+
+       def attach(self):
+               """
+                       Attaches this task to the cgroup.
+               """
+               pid = os.getpid()
+               self.attach_task(pid)
+
+       def destroy(self):
+               """
+                       Deletes the cgroup.
+
+                       All running tasks will be migrated to the parent cgroup.
+               """
+               # Don't delete the root cgroup.
+               if self == self.root:
+                       return
+
+               # Move all tasks to the parent.
+               self.migrate(self.parent)
+
+               # Just make sure the statement above worked.
+               assert self.is_empty(recursive=True), "cgroup must be empty to be destroyed"
+               assert not self.processes
+
+               # Remove the file tree.
+               try:
+                       os.rmdir(self.path)
+               except OSError, e:
+                       # Ignore "Device or resource busy".
+                       if e.errno == 16:
+                               return
+
+                       raise
+
+       def _read(self, file):
+               """
+                       Reads the contect of file in the cgroup directory.
+               """
+               file = os.path.join(self.path, file)
+
+               with open(file) as f:
+                       return f.read()
+
+       def _read_pids(self, file):
+               """
+                       Reads file and interprets the lines as a sorted list.
+               """
+               _pids = self._read(file)
+
+               pids = []
+
+               for pid in _pids.splitlines():
+                       try:
+                               pid = int(pid)
+                       except ValueError:
+                               continue
+
+                       if pid in pids:
+                               continue
+
+                       pids.append(pid)
+
+               return sorted(pids)
+
+       def _write(self, file, what):
+               """
+                       Writes what to file in the cgroup directory.
+               """
+               file = os.path.join(self.path, file)
+
+               f = open(file, "w")
+               f.write("%s" % what)
+               f.close()
+
+       @property
+       def root(self):
+               if self.parent:
+                       return self.parent.root
+
+               return self
+
+       @property
+       def parent(self):
+               # Cannot go above CGROUP_PATH.
+               if self.path == CGROUP_PATH:
+                       return
+
+               if self._parent is None:
+                       parent_name = os.path.dirname(self.name)
+                       self._parent = CGroup(parent_name)
+
+               return self._parent
+
+       @property
+       def subgroups(self):
+               subgroups = []
+
+               for name in os.listdir(self.path):
+                       path = os.path.join(self.path, name)
+                       if not os.path.isdir(path):
+                               continue
+
+                       name = os.path.join(self.name, name)
+                       group = CGroup(name)
+
+                       subgroups.append(group)
+
+               return subgroups
+
+       def is_empty(self, recursive=False):
+               """
+                       Returns True if the cgroup is empty.
+
+                       Otherwise returns False.
+               """
+               if self.tasks:
+                       return False
+
+               if recursive:
+                       for subgroup in self.subgroups:
+                               if subgroup.is_empty(recursive=recursive):
+                                       continue
+
+                               return False
+
+               return True
+
+       @property
+       def tasks(self):
+               """
+                       Returns a list of pids of all tasks
+                       in this process group.
+               """
+               return self._read_pids("tasks")
+
+       @property
+       def processes(self):
+               """
+                       Returns a list of pids of all processes
+                       that are currently running within the cgroup.
+               """
+               return self._read_pids("cgroup.procs")
+
+       def attach_task(self, pid):
+               """
+                       Attaches the task with the given PID to
+                       the cgroup.
+               """
+               self._write("tasks", pid)
+
+       def migrate_task(self, other, pid):
+               """
+                       Migrates a single task to another cgroup.
+               """
+               other.attach_task(pid)
+
+       def migrate(self, other):
+               if self.is_empty(recursive=True):
+                       return
+
+               log.info("Migrating all tasks from '%s' to '%s'." \
+                       % (self.name, other.name))
+
+               while True:
+                       # Migrate all tasks to the new cgroup.
+                       for task in self.tasks:
+                               self.migrate_task(other, task)
+
+                       # Also do that for all subgroups.
+                       for subgroup in self.subgroups:
+                               subgroup.migrate(other)
+
+                       if self.is_empty():
+                               break
+
+       def kill(self, sig=signal.SIGTERM, recursive=True):
+               killed_processes = []
+
+               mypid = os.getpid()
+
+               while True:
+                       for proc in self.processes:
+                               # Don't kill myself.
+                               if proc == mypid:
+                                       continue
+
+                               # Skip all processes that have already been killed.
+                               if proc in killed_processes:
+                                       continue
+
+                               # If we haven't killed the process yet, we kill it.
+                               log.debug("Sending signal %s to process %s..." % (sig, proc))
+
+                               try:
+                                       os.kill(proc, sig)
+                               except OSError, e:
+                                       raise
+
+                               # Save all killed processes to a list.
+                               killed_processes.append(proc)
+
+                       else:
+                               # If no processes are left to be killed, we end the loop.
+                               break
+
+               # Nothing more to do if not in recursive mode.
+               if not recursive:
+                       return
+
+               # Kill all processes in subgroups as well.
+               for subgroup in self.subgroups:
+                       subgroup.kill(sig=sig, recursive=recursive)
+
+       def kill_and_wait(self):
+               # Safely kill all processes in the cgroup.
+               # This first sends SIGTERM and then checks 8 times
+               # after 200ms whether the group is empty. If not,
+               # everything what's still in there gets SIGKILL
+               # and it is five more times checked if everything
+               # went away.
+
+               sig = None
+               for i in range(15):
+                       if i == 0:
+                               sig = signal.SIGTERM
+                       elif i == 9:
+                               sig = signal.SIGKILL
+                       else:
+                               sig = None
+
+                       # If no signal is given and there are no processes
+                       # left, our job is done and we can exit.
+                       if not self.processes:
+                               break
+
+                       if sig:
+                               # Send sig to all processes in the cgroup.
+                               log.info("Sending signal %s to all processes in '%s'." % (sig, self.name))
+                               self.kill(sig=sig, recursive=True)
+
+                       # Sleep for 200ms.
+                       time.sleep(0.2)
+
+               return self.is_empty()
index c620fdb8cf1514ecdfae5df8b71f211143304ba5..521759b10e023f9699c170bd6e6e5da944d8aaac 100644 (file)
@@ -79,7 +79,7 @@ def logOutput(fds, logger, returnOutput=1, start=0, timeout=0):
        return output
 
 
-def do(command, shell=False, chrootPath=None, cwd=None, timeout=0, raiseExc=True, returnOutput=0, personality=None, logger=None, env=None, *args, **kargs):
+def do(command, shell=False, chrootPath=None, cwd=None, timeout=0, raiseExc=True, returnOutput=0, personality=None, logger=None, env=None, cgroup=None, *args, **kargs):
        # Save the output of command
        output = ""
 
@@ -93,8 +93,6 @@ def do(command, shell=False, chrootPath=None, cwd=None, timeout=0, raiseExc=True
                logger.debug("Executing command: %s in %s" % (command, chrootPath or "/"))
 
        try:
-               child = None
-
                # Create new child process
                child = subprocess.Popen(
                        command,
@@ -107,6 +105,10 @@ def do(command, shell=False, chrootPath=None, cwd=None, timeout=0, raiseExc=True
                        env=env
                )
 
+               # If cgroup is given, attach the subprocess.
+               if cgroup:
+                       cgroup.attach_task(child.pid)
+
                # use select() to poll for output so we dont block
                output = logOutput([child.stdout, child.stderr], logger, returnOutput, start, timeout)