import os
import re
import shutil
+import signal
import socket
import tempfile
import time
import uuid
import base
+import cgroup
import chroot
import logger
import packages
# If no logile was given, we use the root logger.
self.log = logging.getLogger("pakfire")
+ # Initialize a cgroup (if supported).
+ self.cgroup = None
+ if cgroup.supported():
+ self.cgroup = cgroup.CGroup("pakfire/builder/%s" % self.build_id)
+
+ # Attach the pakfire-builder process to the parent group.
+ self.cgroup.parent.attach()
+
# Log information about pakfire and some more information, when we
# are running in release mode.
if self.mode == "release":
self.extract()
def stop(self):
- # Kill all still running processes.
- util.orphans_kill(self.path)
+ if self.cgroup:
+ # Kill all still running processes in the cgroup.
+ self.cgroup.kill_and_wait()
+
+ # Remove cgroup and all parent cgroups if they are empty.
+ self.cgroup.migrate_task(self.cgroup.root, os.getpid())
+ self.cgroup.destroy()
+
+ parent = self.cgroup.parent
+ while parent:
+ if not parent.is_empty(recursive=True):
+ break
+
+ parent.destroy()
+ parent = parent.parent
+
+ else:
+ util.orphans_kill(self.path)
# Close pakfire instance.
del self.pakfire
if not kwargs.has_key("chrootPath"):
kwargs["chrootPath"] = self.chrootPath()
+ if not kwargs.has_key("cgroup"):
+ kwargs["cgroup"] = self.cgroup
+
ret = chroot.do(
command,
personality=personality,
return environ
- def do(self, command, shell=True, personality=None, cwd=None, *args, **kwargs):
+ def do(self, command, shell=True, *args, **kwargs):
try:
logger = kwargs["logger"]
except KeyError:
log.debug(" %s=%s" % (k, v))
# Update personality it none was set
- if not personality:
- personality = self.distro.personality
+ if not kwargs.has_key("personality"):
+ kwargs["personality"] = self.distro.personality
- if not cwd:
- cwd = "/%s" % LOCAL_TMP_PATH
+ if not kwargs.has_key("cwd"):
+ kwargs["cwd"] = "/%s" % LOCAL_TMP_PATH
# Make every shell to a login shell because we set a lot of
# environment things there.
if shell:
command = ["bash", "--login", "-c", command]
+ kwargs["shell"] = False
- return chroot.do(
- command,
- personality=personality,
- shell=False,
- env=self.environ,
- cwd=cwd,
- *args,
- **kwargs
- )
+ kwargs["env"] = self.environ
+
+ try:
+ return chroot.do(command, *args, **kwargs)
+ except Error:
+ if not logger:
+ logger = logging.getLogger("pakfire")
+
+ logger.error("Command exited with an error: %s" % command)
+ raise
def run_script(self, script, *args):
if not script.startswith("/"):
--- /dev/null
+#!/usr/bin/python
+
+import os
+import shutil
+import signal
+import time
+
+import logging
+log = logging.getLogger("pakfire.cgroups")
+
+CGROUP_PATH_CANDIDATES = (
+ "/sys/fs/cgroup/systemd/system",
+ "/sys/fs/cgroup",
+)
+
+def find_cgroup_path():
+ """
+ This function tries to find the right place
+ where to put the cgroups.
+ """
+ for path in CGROUP_PATH_CANDIDATES:
+ check_path = os.path.join(path, "tasks")
+ if not os.path.exists(check_path):
+ continue
+
+ return path
+
+CGROUP_PATH = find_cgroup_path()
+
+def supported():
+ """
+ Returns True or False depending on
+ whether cgroups are supported or not.
+ """
+ if CGROUP_PATH is None:
+ return False
+
+ return True
+
+class CGroup(object):
+ def __init__(self, name):
+ assert supported(), "cgroups are not supported by this kernel"
+
+ self.name = name
+ self.path = os.path.join(CGROUP_PATH, name)
+ self.path = os.path.abspath(self.path)
+
+ # The parent cgroup.
+ self._parent = None
+
+ # Initialize the cgroup.
+ self.create()
+
+ log.debug("cgroup '%s' has been successfully initialized." % self.name)
+
+ def __repr__(self):
+ return "<%s %s>" % (self.__class__.__name__, self.name)
+
+ def __cmp__(self, other):
+ return cmp(self.path, other.path)
+
+ def create(self):
+ """
+ Creates the filesystem structure for
+ the cgroup.
+ """
+ if os.path.exists(self.path):
+ return
+
+ log.debug("cgroup '%s' has been created." % self.name)
+ os.makedirs(self.path)
+
+ def attach(self):
+ """
+ Attaches this task to the cgroup.
+ """
+ pid = os.getpid()
+ self.attach_task(pid)
+
+ def destroy(self):
+ """
+ Deletes the cgroup.
+
+ All running tasks will be migrated to the parent cgroup.
+ """
+ # Don't delete the root cgroup.
+ if self == self.root:
+ return
+
+ # Move all tasks to the parent.
+ self.migrate(self.parent)
+
+ # Just make sure the statement above worked.
+ assert self.is_empty(recursive=True), "cgroup must be empty to be destroyed"
+ assert not self.processes
+
+ # Remove the file tree.
+ try:
+ os.rmdir(self.path)
+ except OSError, e:
+ # Ignore "Device or resource busy".
+ if e.errno == 16:
+ return
+
+ raise
+
+ def _read(self, file):
+ """
+ Reads the contect of file in the cgroup directory.
+ """
+ file = os.path.join(self.path, file)
+
+ with open(file) as f:
+ return f.read()
+
+ def _read_pids(self, file):
+ """
+ Reads file and interprets the lines as a sorted list.
+ """
+ _pids = self._read(file)
+
+ pids = []
+
+ for pid in _pids.splitlines():
+ try:
+ pid = int(pid)
+ except ValueError:
+ continue
+
+ if pid in pids:
+ continue
+
+ pids.append(pid)
+
+ return sorted(pids)
+
+ def _write(self, file, what):
+ """
+ Writes what to file in the cgroup directory.
+ """
+ file = os.path.join(self.path, file)
+
+ f = open(file, "w")
+ f.write("%s" % what)
+ f.close()
+
+ @property
+ def root(self):
+ if self.parent:
+ return self.parent.root
+
+ return self
+
+ @property
+ def parent(self):
+ # Cannot go above CGROUP_PATH.
+ if self.path == CGROUP_PATH:
+ return
+
+ if self._parent is None:
+ parent_name = os.path.dirname(self.name)
+ self._parent = CGroup(parent_name)
+
+ return self._parent
+
+ @property
+ def subgroups(self):
+ subgroups = []
+
+ for name in os.listdir(self.path):
+ path = os.path.join(self.path, name)
+ if not os.path.isdir(path):
+ continue
+
+ name = os.path.join(self.name, name)
+ group = CGroup(name)
+
+ subgroups.append(group)
+
+ return subgroups
+
+ def is_empty(self, recursive=False):
+ """
+ Returns True if the cgroup is empty.
+
+ Otherwise returns False.
+ """
+ if self.tasks:
+ return False
+
+ if recursive:
+ for subgroup in self.subgroups:
+ if subgroup.is_empty(recursive=recursive):
+ continue
+
+ return False
+
+ return True
+
+ @property
+ def tasks(self):
+ """
+ Returns a list of pids of all tasks
+ in this process group.
+ """
+ return self._read_pids("tasks")
+
+ @property
+ def processes(self):
+ """
+ Returns a list of pids of all processes
+ that are currently running within the cgroup.
+ """
+ return self._read_pids("cgroup.procs")
+
+ def attach_task(self, pid):
+ """
+ Attaches the task with the given PID to
+ the cgroup.
+ """
+ self._write("tasks", pid)
+
+ def migrate_task(self, other, pid):
+ """
+ Migrates a single task to another cgroup.
+ """
+ other.attach_task(pid)
+
+ def migrate(self, other):
+ if self.is_empty(recursive=True):
+ return
+
+ log.info("Migrating all tasks from '%s' to '%s'." \
+ % (self.name, other.name))
+
+ while True:
+ # Migrate all tasks to the new cgroup.
+ for task in self.tasks:
+ self.migrate_task(other, task)
+
+ # Also do that for all subgroups.
+ for subgroup in self.subgroups:
+ subgroup.migrate(other)
+
+ if self.is_empty():
+ break
+
+ def kill(self, sig=signal.SIGTERM, recursive=True):
+ killed_processes = []
+
+ mypid = os.getpid()
+
+ while True:
+ for proc in self.processes:
+ # Don't kill myself.
+ if proc == mypid:
+ continue
+
+ # Skip all processes that have already been killed.
+ if proc in killed_processes:
+ continue
+
+ # If we haven't killed the process yet, we kill it.
+ log.debug("Sending signal %s to process %s..." % (sig, proc))
+
+ try:
+ os.kill(proc, sig)
+ except OSError, e:
+ raise
+
+ # Save all killed processes to a list.
+ killed_processes.append(proc)
+
+ else:
+ # If no processes are left to be killed, we end the loop.
+ break
+
+ # Nothing more to do if not in recursive mode.
+ if not recursive:
+ return
+
+ # Kill all processes in subgroups as well.
+ for subgroup in self.subgroups:
+ subgroup.kill(sig=sig, recursive=recursive)
+
+ def kill_and_wait(self):
+ # Safely kill all processes in the cgroup.
+ # This first sends SIGTERM and then checks 8 times
+ # after 200ms whether the group is empty. If not,
+ # everything what's still in there gets SIGKILL
+ # and it is five more times checked if everything
+ # went away.
+
+ sig = None
+ for i in range(15):
+ if i == 0:
+ sig = signal.SIGTERM
+ elif i == 9:
+ sig = signal.SIGKILL
+ else:
+ sig = None
+
+ # If no signal is given and there are no processes
+ # left, our job is done and we can exit.
+ if not self.processes:
+ break
+
+ if sig:
+ # Send sig to all processes in the cgroup.
+ log.info("Sending signal %s to all processes in '%s'." % (sig, self.name))
+ self.kill(sig=sig, recursive=True)
+
+ # Sleep for 200ms.
+ time.sleep(0.2)
+
+ return self.is_empty()
return output
-def do(command, shell=False, chrootPath=None, cwd=None, timeout=0, raiseExc=True, returnOutput=0, personality=None, logger=None, env=None, *args, **kargs):
+def do(command, shell=False, chrootPath=None, cwd=None, timeout=0, raiseExc=True, returnOutput=0, personality=None, logger=None, env=None, cgroup=None, *args, **kargs):
# Save the output of command
output = ""
logger.debug("Executing command: %s in %s" % (command, chrootPath or "/"))
try:
- child = None
-
# Create new child process
child = subprocess.Popen(
command,
env=env
)
+ # If cgroup is given, attach the subprocess.
+ if cgroup:
+ cgroup.attach_task(child.pid)
+
# use select() to poll for output so we dont block
output = logOutput([child.stdout, child.stderr], logger, returnOutput, start, timeout)