# along with this library; if not, see <http://www.gnu.org/licenses>.
#
-import consts
-import copy
+from container import ContainerError
from controller import Controller
-from enum import Enum
-import multiprocessing as mp
-import os
from run import Run, RunError
-import time
+import multiprocessing as mp
+from enum import Enum
+import consts
import utils
+import time
+import copy
+import os
+
class CgroupMount(object):
def __init__(self, mount_line):
entries = mount_line.split()
- if entries[2] == "cgroup":
+ if entries[2] == 'cgroup':
self.version = CgroupVersion.CGROUP_V1
- elif entries[2] == "cgroup2":
+ elif entries[2] == 'cgroup2':
self.version = CgroupVersion.CGROUP_V2
else:
- raise ValueError("Unknown cgroup version")
+ raise ValueError('Unknown cgroup version')
self.mount_point = entries[1]
if self.version == CgroupVersion.CGROUP_V1:
self.controller = entries[3].split(',')[-1]
- if self.controller == "clone_children":
+ if self.controller == 'clone_children':
# the cpuset controller may append this option to the end
# rather than the controller name like all other controllers
- self.controller = "cpuset"
+ self.controller = 'cpuset'
def __str__(self):
- out_str = "CgroupMount"
- out_str += "\n\tMount Point = {}".format(self.mount_point)
- out_str += "\n\tCgroup Version = {}".format(self.version)
+ out_str = 'CgroupMount'
+ out_str += '\n\tMount Point = {}'.format(self.mount_point)
+ out_str += '\n\tCgroup Version = {}'.format(self.version)
if self.controller is not None:
- out_str += "\n\tController = {}".format(self.controller)
+ out_str += '\n\tController = {}'.format(self.controller)
return out_str
+
class CgroupVersion(Enum):
CGROUP_UNK = 0
CGROUP_V1 = 1
if option == controller:
return CgroupVersion.CGROUP_V1
elif line.split()[0] == 'cgroup2':
- with open(os.path.join(mnt_path, 'cgroup.controllers'), 'r') as ctrlf:
+ ctrlf_path = os.path.join(mnt_path, 'cgroup.controllers')
+ with open(ctrlf_path, 'r') as ctrlf:
controllers = ctrlf.readline()
for ctrl in controllers.split():
if ctrl == controller:
return CgroupVersion.CGROUP_V2
- raise IndexError("Unknown version for controller {}".format(controller))
+ raise IndexError(
+ 'Unknown version for controller {}'
+ ''.format(controller)
+ )
+
class Cgroup(object):
# This class is analogous to libcgroup's struct cgroup
self.children = list()
def __str__(self):
- out_str = "Cgroup {}\n".format(self.name)
+ out_str = 'Cgroup {}\n'.format(self.name)
for ctrl_key in self.controllers:
out_str += utils.indent(str(self.controllers[ctrl_key]), 4)
cmd.append('{}={}'.format(setting, value))
elif isinstance(setting, list) and isinstance(value, list):
if len(setting) != len(value):
- raise ValueError('Settings list length must equal values list length')
+ raise ValueError(
+ 'Settings list length must equal '
+ 'values list length'
+ )
for idx, stg in enumerate(setting):
cmd.append('-r')
cmd.append('{}={}'.format(stg, value[idx]))
else:
- raise ValueError('Invalid inputs to cgget:\nsetting: {}\n' \
- 'value{}'.format(setting, value))
+ raise ValueError(
+ 'Invalid inputs to cgget:\nsetting: {}\n'
+ 'value{}'
+ ''.format(setting, value)
+ )
if copy_from is not None:
cmd.append('--copy-from')
try:
ret = Run.run(cmd)
except RunError as re:
- if "profiling" in re.stderr:
+ if 'profiling' in re.stderr:
ret = re.stdout
else:
raise re
Run.run(cmd)
@staticmethod
- # given a stdout of cgsnapshot-like data, create a dictionary of cgroup objects
+ # given a stdout of cgsnapshot-like data, create a dictionary of cgroup
+ # objects
def snapshot_to_dict(cgsnapshot_stdout):
cgdict = dict()
line = line.strip()
if mode == parsemode.UNKNOWN:
- if line.startswith("#"):
+ if line.startswith('#'):
continue
- elif line.startswith("group") and line.endswith("{"):
+ elif line.startswith('group') and line.endswith('{'):
cg_name = line.split()[1]
if cg_name in cgdict:
# We already have a cgroup with this name. This block
mode = parsemode.GROUP
elif mode == parsemode.GROUP:
- if line.startswith("perm {"):
+ if line.startswith('perm {'):
mode = parsemode.PERM
- elif line.endswith("{"):
+ elif line.endswith('{'):
ctrl_name = line.split()[0]
cg.controllers[ctrl_name] = Controller(ctrl_name)
mode = parsemode.CONTROLLER
- elif line.endswith("}"):
+ elif line.endswith('}'):
# we've found the end of this group
cgdict[cg_name] = cg
mode = parsemode.UNKNOWN
elif mode == parsemode.CONTROLLER:
- if line.endswith("\";"):
+ if line.endswith('";'):
# this is a setting on a single line
- setting = line.split("=")[0]
- value = line.split("=")[1]
+ setting = line.split('=')[0]
+ value = line.split('=')[1]
cg.controllers[ctrl_name].settings[setting] = value
- elif line.endswith("}"):
+ elif line.endswith('}'):
# we've found the end of this controller
mode = parsemode.GROUP
else:
# this is a multi-line setting
- setting = line.split("=")[0]
- value = "{}\n".format(line.split("=")[1])
+ setting = line.split('=')[0]
+ value = '{}\n'.format(line.split('=')[1])
mode = parsemode.SETTING
elif mode == parsemode.SETTING:
- if line.endswith("\";"):
+ if line.endswith('";'):
# this is the last line of the multi-line setting
value += line
mode = parsemode.CONTROLLER
else:
- value += "{}\n".format(line)
+ value += '{}\n'.format(line)
elif mode == parsemode.PERM:
- if line.startswith("admin {"):
+ if line.startswith('admin {'):
mode = parsemode.ADMIN
- elif line.startswith("task {"):
+ elif line.startswith('task {'):
mode = parsemode.TASK
- elif line.endswith("}"):
+ elif line.endswith('}'):
mode = parsemode.GROUP
elif mode == parsemode.ADMIN or mode == parsemode.TASK:
# todo - handle these modes
- if line.endswith("}"):
+ if line.endswith('}'):
mode = parsemode.PERM
return cgdict
# ensure the deny list file exists
if config.args.container:
try:
- config.container.run(['sudo', 'touch', '/etc/cgsnapshot_blacklist.conf'])
+ config.container.run(
+ ['sudo',
+ 'touch',
+ '/etc/cgsnapshot_blacklist.conf']
+ )
except RunError as re:
- if re.ret == 0 and "unable to resolve host" in re.stderr:
+ if re.ret == 0 and 'unable to resolve host' in re.stderr:
pass
else:
Run.run(['sudo', 'touch', '/etc/cgsnapshot_blacklist.conf'])
res = Run.run(cmd)
except RunError as re:
if re.ret == 0 and \
- "neither blacklisted nor whitelisted" in re.stderr:
+ 'neither blacklisted nor whitelisted' in re.stderr:
res = re.stdout
# convert the cgsnapshot stdout to a dict of cgroup objects
config.container.run(cmd, shell_bool=True)
else:
Run.run(cmd, shell_bool=True)
- except:
- # todo - check the errno to ensure the directory exists rather
+ except RunError as re:
+ # check the errno to ensure the directory exists rather
# than receiving a different error
- pass
+ if re.ret == 1 and \
+ 'File exists' in re.stderr:
+ pass
+ else:
+ raise re
cmd2 = list()
cmd.append('-n')
if config.args.container:
- raise ValueError("Running cgrules within a container is not supported")
+ raise ValueError(
+ 'Running cgrules within a container is not '
+ 'supported'
+ )
else:
Run.run(cmd, shell_bool=True)
config.container.run(cmd, shell_bool=True)
else:
Run.run(cmd, shell_bool=True)
- except:
+ except (RunError, ContainerError):
# ignore any errors during the kill command. this is belt
# and suspenders code
pass
if line.split()[0] == 'cgroup':
for option in line.split()[3].split(','):
if option == ctrl_name:
- return line.split()[1]
+ return mnt_path
- raise IndexError("Unknown mount point for controller {}".format(ctrl_name))
+ raise IndexError(
+ 'Unknown mount point for controller {}'
+ ''.format(ctrl_name)
+ )
@staticmethod
def __get_controller_mount_point_v2(ctrl_name):
if controller == ctrl_name:
return mnt_path
- raise IndexError("Unknown mount point for controller {}".format(ctrl_name))
+ raise IndexError(
+ 'Unknown mount point for controller {}'
+ ''.format(ctrl_name)
+ )
@staticmethod
def get_controller_mount_point(ctrl_name):
elif vers == CgroupVersion.CGROUP_V2:
return Cgroup.__get_controller_mount_point_v2(ctrl_name)
else:
- raise ValueError("Unsupported cgroup version")
+ raise ValueError('Unsupported cgroup version')
@staticmethod
- def clear(config, empty=False, cghelp=False, load_file=None, load_dir=None):
+ def clear(config, empty=False, cghelp=False, load_file=None,
+ load_dir=None):
cmd = list()
if not config.args.container:
try:
ret = Run.run(cmd)
except RunError as re:
- if "profiling" in re.stderr:
+ if 'profiling' in re.stderr:
ret = re.stdout
else:
raise re
for line in mntf.readlines():
entry = line.split()
- if entry[0] != "cgroup" and entry[0] != "cgroup2":
+ if entry[0] != 'cgroup' and entry[0] != 'cgroup2':
continue
mount = CgroupMount(line)
if mount.version == CgroupVersion.CGROUP_V1 or \
- expand_v2_mounts == False:
+ expand_v2_mounts is False:
mount_list.append(mount)
continue
with open(os.path.join(mount.mount_point,
- "cgroup.controllers")) as ctrlf:
+ 'cgroup.controllers')) as ctrlf:
for line in ctrlf.readlines():
for ctrl in line.split():
mount_copy = copy.deepcopy(mount)
try:
ret = Run.run(cmd)
except RunError as re:
- if "profiling" in re.stderr:
+ if 'profiling' in re.stderr:
ret = re.stdout
else:
raise re
for mount in mounts:
if mount.controller == controller:
- proc_file = os.path.join(mount.mount_point, cgroup, "cgroup.procs")
+ proc_file = os.path.join(
+ mount.mount_point,
+ cgroup,
+ 'cgroup.procs'
+ )
cmd = ['cat', proc_file]
if config.args.container:
values_only=True)
if value != expected_value:
- raise CgroupError("cgget expected {} but received {}".format(
+ raise CgroupError('cgget expected {} but received {}'.format(
expected_value, value))
@staticmethod
Cgroup.set(config, cgname, setting, value)
Cgroup.get_and_validate(config, cgname, setting, value)
+
class CgroupError(Exception):
def __init__(self, message):
super(CgroupError, self).__init__(message)