import xml.etree.ElementTree as ET
-import inspect
import os
import configparser
+from disk import disk
+
class log():
def __init__(self, log_level):
self.log_level = log_level
return self.connection
-class vm():
- def __init__(self, vm_xml_file, snapshot_xml_file, image, root_uid):
+class machine():
+ def __init__(self, vm_xml_file, snapshot_xml_file, image, root_uid, username, password):
self.log = log(4)
self.con = libvirt_con("qemu:///system")
try:
self.log.error("No such file: {}".format(self.image))
self.root_uid = root_uid
+ self.disk = disk(image)
+
+ self.username = username
+ self.password = password
def define(self):
self.dom = self.con.con.defineXML(self.vm_xml)
raise BaseException
def revert_snapshot(self):
- print(inspect.getmembers(self.dom, predicate=inspect.ismethod))
self.dom.revertToSnapshot(self.snapshot)
self.snapshot.delete()
return elem.get("path")
def check_is_booted_up(self):
- serial_con = connection(self.get_serial_device())
+ serial_con = serial_connection(self.get_serial_device())
serial_con.write("\n")
# This will block till the domain is booted up
#serial_con.close()
- def login(self, username, password):
+ def login(self):
try:
- self.serial_con = connection(self.get_serial_device(), username="root")
- self.serial_con.login("25814@root")
+ self.serial_con = serial_connection(self.get_serial_device(), username=self.username)
+ self.serial_con.login(self.password)
except BaseException as e:
self.log.error("Could not connect to the domain via serial console")
def cmd(self, cmd):
return self.serial_con.command(cmd)
+ def copy_in(self, fr, to):
+ try:
+ self.disk.mount(self.root_uid, "/")
+ self.disk.copy_in(fr, to)
+ except BaseException as e:
+ self.log.error(e)
+ finally:
+ self.disk.umount("/")
+ self.disk.close()
-class connection():
+class serial_connection():
def __init__(self, device, username=None):
self.buffer = b""
self.back_at_prompt_pattern = None
return self.con.in_waiting
+ def line_in_buffer(self):
+ if b"\n" in self.buffer:
+ return True
- def readline2(self, pattern=None):
- string = ""
- string2 = b""
- if pattern:
- pattern = re.compile(pattern)
-
- while 1:
- char = self.con.read(1)
- string = string + char.decode("utf-8")
- string2 = string2 + char
- #print(char)
- print(char.decode("utf-8"), end="")
-
- #print(string2)
- if pattern and pattern.match(string):
- #print("get here1")
- #print(string2)
- return {"string" : string, "return-code" : 1}
-
- if char == b"\n":
- #print(char)
- #print(string2)
- #print("get here2")
- return {"return-code" : 0}
-
- def check_logged_in(self, username):
- pattern = "^\[" + username + "@.+\]#"
- data = self.readline(pattern=pattern)
- if data["return-code"] == 1:
- print("We are logged in")
- return True
- else:
- print("We are not logged in")
- return False
+ return False
+
+ def print_lines_in_buffer(self):
+ while True:
+ self.log.debug("Fill buffer ...")
+ self.peek(len(self.buffer) + self.in_waiting)
+ self.log.debug("Current buffer length: {}".format(len(self.buffer)))
+ if self.line_in_buffer() == True:
+ while self.line_in_buffer() == True:
+ data = self.readline()
+ self.log_console_line(data.decode())
+ else:
+ self.log.debug("We have printed all lines in the buffer")
+ break
def login(self, password):
if self.username == None:
self.log.error("Username cannot be blank")
return False
+ self.print_lines_in_buffer()
+
# Hit enter to see what we get
self.con.write(b'\n')
# We get two new lines \r\n ?
data = self.readline()
self.log_console_line(data.decode())
+ self.print_lines_in_buffer()
if self.back_at_prompt():
self.log.debug("We are already logged in.")
# Read all line till we get login:
while 1:
- data = self.peek()
- if not data.decode() == "l":
- self.log.debug("We get no l at the start")
- self.log_console_line(self.readline().decode())
-
# We need to use self.in_waiting because with self.con.in_waiting we get
# not the complete string
size = len(self.buffer) + self.in_waiting
self.con.flush()
def command(self, command):
- self.write("{}\n".format(command))
+ self.write("{}; echo \"END: $?\"\n".format(command))
# We need to read out the prompt for this command first
# If we do not do this we will break the loop immediately
data = self.readline()
self.log_console_line(data.decode())
+ # We saved our exit code in data (the last line)
+ self.log.debug(data.decode())
+ data = data.decode().replace("END: ", "")
+ self.log.debug(data)
+ self.log.debug(data.strip())
+ return data.strip()
+
# A class which define and undefine a virtual network based on an xml file
class network():
# Should read the test, check if the syntax are valid
# and return tuples with the ( host, command ) structure
class recipe():
- def __init__(self, path):
+ def __init__(self, path, circle=[]):
self.log = log(4)
self.recipe_file = path
+ self.path = os.path.dirname(self.recipe_file)
+ self.log.debug("Path of recipe is: {}".format(self.recipe_file))
self._recipe = None
+ self._machines = None
+
+ self.in_recursion = True
+ if len(circle) == 0:
+ self.in_recursion = False
+
+ self.circle = circle
+ self.log.debug(circle)
+ self.log.debug(self.circle)
if not os.path.isfile(self.recipe_file):
self.log.error("No such file: {}".format(self.recipe_file))
return self._recipe
+ @property
+ def machines(self):
+ if not self._machines:
+ self._machines = []
+ for line in self._recipe:
+ if line[0] != "all" and line[0] not in self._machines:
+ self._machines.append(line[0])
+
+ return self._machines
+
def parse(self):
self._recipe = []
i = 1
if len(raw_line) < 2:
self.log.error("Error parsing the recipe in line {}".format(i))
raise RecipeExeption
- cmd = raw_line[1]
+ cmd = raw_line[1].strip()
raw_line = raw_line[0].strip().split(" ")
if len(raw_line) == 0:
self.log.error("Failed to parse the recipe in line {}".format(i))
raise RecipeExeption
- elif len(raw_line) == 1:
- if raw_line[0] == "":
+
+ if raw_line[0].strip() == "":
self.log.error("Failed to parse the recipe in line {}".format(i))
raise RecipeExeption
- machine = raw_line[0]
+
+ machine = raw_line[0].strip()
+
+ if len(raw_line) == 2:
+ extra = raw_line[1].strip()
+ else:
extra = ""
- elif len(raw_line) == 2:
- machine = raw_line[0]
- extra = raw_line[1]
- self._recipe.append((machine.strip(), extra.strip(), cmd.strip()))
+ # We could get a machine here or a include statement
+ if machine == "include":
+ path = cmd.strip()
+ path = os.path.normpath(self.path + "/" + path)
+ path = path + "/recipe"
+ if path in self.circle:
+ self.log.error("Detect import loop!")
+ raise RecipeExeption
+ self.circle.append(path)
+ recipe_to_include = recipe(path, circle=self.circle)
+
+ if machine == "include":
+ self._recipe.extend(recipe_to_include.recipe)
+ else:
+ # Support also something like 'alice,bob: echo'
+ machines = machine.split(",")
+ for machine in machines:
+ self._recipe.append((machine.strip(), extra.strip(), cmd.strip()))
i = i + 1
+ if not self.in_recursion:
+ tmp_recipe = []
+ for line in self._recipe:
+ if line[0] != "all":
+ tmp_recipe.append(line)
+ else:
+ for machine in self.machines:
+ tmp_recipe.append((machine.strip(), line[1], line[2]))
+
+ self._recipe = tmp_recipe
+
+
class test():
def __init__(self, path):
self.config.read(self.settings_file)
self.name = self.config["DEFAULT"]["Name"]
self.description = self.config["DEFAULT"]["Description"]
+ self.copy_to = self.config["DEFAULT"]["Copy_to"]
+ self.copy_from = self.config["DEFAULT"]["Copy_from"]
+ self.copy_from = self.copy_from.split(",")
+
+ tmp = []
+ for file in self.copy_from:
+ file = file.strip()
+ file = os.path.normpath(self.path + "/" + file)
+ tmp.append(file)
+
+ self.copy_from = tmp
self.virtual_environ_name = self.config["VIRTUAL_ENVIRONMENT"]["Name"]
self.virtual_environ_path = self.config["VIRTUAL_ENVIRONMENT"]["Path"]
self.virtual_machines = self.virtual_environ.get_machines()
def virtual_environ_start(self):
- pass
+ for name in self.virtual_environ.network_names:
+ self.virtual_networks[name].define()
+ self.virtual_networks[name].start()
+
+ for name in self.virtual_environ.machine_names:
+ self.virtual_machines[name].define()
+ self.virtual_machines[name].create_snapshot()
+ self.virtual_machines[name].copy_in(self.copy_from, self.copy_to)
+ self.virtual_machines[name].start()
+
+ self.log.debug("Try to login on all machines")
+ for name in self.virtual_environ.machine_names:
+ self.virtual_machines[name].login()
def load_recipe(self):
- pass
+ try:
+ self.recipe = recipe(self.recipe_file)
+ except BaseException:
+ self.log.error("Failed to load recipe")
+ exit(1)
+
+ def run_recipe(self):
+ for line in self.recipe.recipe:
+ return_value = self.virtual_machines[line[0]].cmd(line[2])
+ self.log.debug("Return value is: {}".format(return_value))
+ if return_value != "0" and line[1] == "":
+ self.log.error("Failed to execute command '{}' on {}, return code: {}".format(line[2],line[0], return_value))
+ return False
+ elif return_value == "0" and line[1] == "!":
+ self.log.error("Succeded to execute command '{}' on {}, return code: {}".format(line[2],line[0],return_value))
+ return False
+ else:
+ self.log.debug("Command '{}' on {} returned with: {}".format(line[2],line[0],return_value))
- def run_recipe():
- pass
+ def virtual_environ_stop(self):
+ for name in self.virtual_environ.machine_names:
+ self.virtual_machines[name].shutdown()
+ self.virtual_machines[name].revert_snapshot()
+ self.virtual_machines[name].undefine()
- def virtual_environ_stop():
- pass
+ for name in self.virtual_environ.network_names:
+ self.virtual_networks[name].undefine()
# Should return all vms and networks in a list
machines = {}
for _machine in self.machines:
self.log.debug(_machine)
- machines.setdefault(_machine, vm(
+ machines.setdefault(_machine, machine(
os.path.normpath(self.path + "/" + self.config[_machine]["xml_file"]),
- os.path.normpath(self.path + "/" + self.config[_machine]["snapshot_xml_file"])))
+ os.path.normpath(self.path + "/" + self.config[_machine]["snapshot_xml_file"]),
+ self.config[_machine]["image"],
+ self.config[_machine]["root_uid"],
+ self.config[_machine]["username"],
+ self.config[_machine]["password"]))
return machines
+ @property
+ def machine_names(self):
+ return self.machines
+
+ @property
+ def network_names(self):
+ return self.networks
+
if __name__ == "__main__":
import argparse
args = parser.parse_args()
- _recipe = recipe("/home/jonatan/python-testing-kvm/test/recipe")
currenttest = test(args.dir)
currenttest.read_settings()
- currenttest.virtual_environ_setup()
\ No newline at end of file
+ currenttest.virtual_environ_setup()
+ currenttest.load_recipe()
+ try:
+ currenttest.virtual_environ_start()
+ currenttest.run_recipe()
+ except BaseException as e:
+ print(e)
+ finally:
+ currenttest.virtual_environ_stop()
+