X-Git-Url: http://git.ipfire.org/?p=nitsi.git;a=blobdiff_plain;f=test.py;h=9ea57981e5b4a47acad742d593c40fe1e965e44f;hp=de209a48b1d1f2c96230a189f62ed10ed703ddd8;hb=2e8d04732813c535a6e5b0378199643793b3176e;hpb=5e7f6db7693903d714993d8e890097bac5d57357 diff --git a/test.py b/test.py index de209a4..9ea5798 100755 --- a/test.py +++ b/test.py @@ -1,698 +1,26 @@ #!/usr/bin/python3 -import serial - -import re -from time import sleep -import sys import libvirt -import xml.etree.ElementTree as ET - -import inspect import os import configparser -class log(): - def __init__(self, log_level): - self.log_level = log_level - - def debug(self, string): - if self.log_level >= 4: - print("DEBUG: {}".format(string)) - - def error(self, string): - print("ERROR: {}".format(string)) - -class libvirt_con(): - def __init__(self, uri): - self.log = log(4) - self.uri = uri - self.connection = None - - def get_domain_from_name(self, name): - dom = self.con.lookupByName(name) - - if dom == None: - raise BaseException - return dom - - @property - def con(self): - if self.connection == None: - try: - self.connection = libvirt.open(self.uri) - except BaseException as error: - self.log.error("Could not connect to: {}".format(self.uri)) - - self.log.debug("Connected to: {}".format(self.uri)) - return self.connection - - return self.connection - - -class vm(): - def __init__(self, vm_xml_file, snapshot_xml_file, image, root_uid): - self.log = log(4) - self.con = libvirt_con("qemu:///system") - try: - with open(vm_xml_file) as fobj: - self.vm_xml = fobj.read() - except FileNotFoundError as error: - self.log.error("No such file: {}".format(vm_xml_file)) - - try: - with open(snapshot_xml_file) as fobj: - self.snapshot_xml = fobj.read() - except FileNotFoundError as error: - self.log.error("No such file: {}".format(snapshot_xml_file)) - - self.image = image - - if not os.path.isfile(self.image): - self.log.error("No such file: {}".format(self.settings_file)) - - self.root_uid = root_uid - - def define(self): - self.dom = self.con.con.defineXML(self.vm_xml) - if self.dom == None: - self.log.error("Could not define VM") - raise BaseException - - def start(self): - if self.dom.create() < 0: - self.log.error("Could not start VM") - raise BaseException - - def shutdown(self): - if self.is_running(): - if self.dom.shutdown() < 0: - self.log.error("Could not shutdown VM") - raise BaseException - else: - self.log.error("Domain is not running") - - def undefine(self): - self.dom.undefine() - - def create_snapshot(self): - - self.snapshot = self.dom.snapshotCreateXML(self.snapshot_xml) - - if not self.snapshot: - self.log.error("Could not create snapshot") - raise BaseException - - def revert_snapshot(self): - print(inspect.getmembers(self.dom, predicate=inspect.ismethod)) - self.dom.revertToSnapshot(self.snapshot) - #self.dom.SnapshotDelete(self.snapshot) - - def is_running(self): - - state, reason = self.dom.state() - - if state == libvirt.VIR_DOMAIN_RUNNING: - return True - else: - return False - - def get_serial_device(self): - - if not self.is_running(): - raise BaseException - - xml_root = ET.fromstring(self.dom.XMLDesc(0)) - - elem = xml_root.find("./devices/serial/source") - return elem.get("path") - - def check_is_booted_up(self): - serial_con = connection(self.get_serial_device()) - - serial_con.write("\n") - # This will block till the domain is booted up - serial_con.read(1) - - #serial_con.close() - - def login(self, username, password): - try: - self.serial_con = connection(self.get_serial_device(), username="root") - self.serial_con.login("25814@root") - except BaseException as e: - self.log.error("Could not connect to the domain via serial console") - - def cmd(self, cmd): - return self.serial_con.command(cmd) - - - - - -# try: -# dom = conn.lookupByUUIDString(uuid) -# except: -# flash(u"Failed to get the domain object", 'alert-danger') -# print('Failed to get the domain object', file=sys.stderr) -# conn.close() -# return redirect("/vm") - -# domname = dom.name() -# if action == "start": -# try: -# dom.create() -# except: -# flash(u"Can not boot guest domain.", 'alert-danger') -# print('Can not boot guest domain.', file=sys.stderr) -# conn.close() -# return redirect("/vm") - -# flash(u"Sucessfully started Domain \"{}\"".format(domname), 'alert-info') -# conn.close() -# return redirect("/vm") - -# elif action == "shutdown": -# try: -# dom.shutdown() -# except: -# flash(u"Can not shutdown guest domain.", 'alert-danger') -# print('Can not shutdown guest domain.', file=sys.stderr) -# conn.close() -# return redirect("/vm") - -# flash(u"Sucessfully shutdowned Domain \"{}\"".format(domname), 'alert-info') -# conn.close() -# return redirect("/vm") - -# elif action == "destroy": -# try: -# dom.destroy() -# except: -# flash(u"Can not destroy guest domain.", 'alert-danger') -# print('Can not destroy guest domain.', file=sys.stderr) -# conn.close() -# return redirect("/vm") - -# flash(u"Sucessfully destroyed Domain \"{}\"".format(domname), 'alert-info') -# conn.close() -# return redirect("/vm") - -# elif action == "pause": -# try: -# dom.suspend() -# except: -# flash(u"Can not pause guest domain.", 'alert-danger') -# print('Can not pause guest domain.', file=sys.stderr) -# conn.close() -# return redirect("/vm") - -# flash(u"Sucessfully paused Domain \"{}\"".format(domname), 'alert-info') -# conn.close() -# return redirect("/vm") - -# elif action == "resume": -# try: -# dom.resume() -# except: -# flash(u"Can not eesume guest domain.:", 'alert-danger') -# print('Can not resume guest domain.', file=sys.stderr) -# conn.close() -# return redirect("/vm") - -# flash(u"Sucessfully resumed Domain \"{}\"".format(domname), 'alert-info') -# conn.close() -# return redirect("/vm") - -# else: -# flash(u"No such action: \"{}\"".format(action), 'alert-warning') -# conn.close() -# return redirect("/vm") - - -# @vms.route('/vm') -# @login_required -# def vm_overview(): -# import sys -# import libvirt -# conn = libvirt.open('qemu:///system') -# doms = conn.listAllDomains(0) -# domains = [] - -# if len(doms) != 0: -# for dom in doms: -# domain = {} -# domain.setdefault("name", dom.name()) -# domain.setdefault("uuid", dom.UUIDString()) -# state, reason = dom.state() -# domain.setdefault("state", dom_state(state)) -# domains.append(domain) - -# conn.close() - -# return render_template("virtberry_vm_basic-vm.html", domains=domains) - - - -class connection(): - def __init__(self, device, username=None): - self.buffer = b"" - self.back_at_prompt_pattern = None - self.username = username - self.log = log(1) - self.con = serial.Serial(device) - # # Just press enter one time to see what we get - # self.con.write(b'\n') - # # We get two new lines \r\n ? - # data = self.readline() - # self.log_console_line(data.decode()) - - - # if not self.back_at_prompt(): - # self.log.debug("We need to login") - # if not self.login(password): - # self.log.error("Login failed") - # return False - # else: - # self.log.debug("We are logged in") - - - ''' in_waiting_before = 0 - sleep(1) - - while in_waiting_before != self.con.in_waiting: - in_waiting_before = self.con.in_waiting - sleep(0.5) - - print(self.con.in_waiting) - data = self.con.read(self.con.in_waiting) - print(data) - print(data.decode(),end='') - - string = 'root\n' - self.con.write(string.encode()) - self.con.flush() ''' - - ''' in_waiting_before = 0 - sleep(1) - - while in_waiting_before != self.con.in_waiting: - in_waiting_before = self.con.in_waiting - sleep(0.5) - - print(self.con.in_waiting) - data = self.con.read(self.con.in_waiting) - print(data) - print(data.decode(), end='') - - string = '25814@root\n' - self.con.write(string.encode()) - self.con.flush() - - in_waiting_before = 0 - sleep(1) - - while in_waiting_before != self.con.in_waiting: - in_waiting_before = self.con.in_waiting - sleep(0.5) - - print(self.con.in_waiting) - data = self.con.read(self.con.in_waiting) - print(data) - print(data.decode(), end='') ''' - - # check if we already logged in - # If we we get something like [root@localhost ~]# - #self.readline() - - # if not self.check_logged_in(username): - #print("Try to login") - #if self.login(username, password): - # print("Could not login") - # return False - - #pattern = "^\[" + username + "@.+\]#" - #print(pattern) - #data = self.readline(pattern=pattern) - #if data["return-code"] == 1: - # print("We are logged in") - # else: - # print("We are not logged in") - # login - - #while 1: - #data = self.readline("^.*login:") - # if data["return-code"] == 1: - # break - - # string = 'cd / && ls \n' - # self.con.write(string.encode()) - # self.con.flush() - # #print(self.con.read(5)) - - # data = self.readline() - # self.log_console_line(data.decode()) - - # while not self.back_at_prompt(): - # data = self.readline() - # self.log_console_line(data.decode()) - - ''' in_waiting_before = 0 - sleep(1) - - while in_waiting_before != self.con.in_waiting: - in_waiting_before = self.con.in_waiting - sleep(0.5) - - print(self.con.in_waiting) - data = self.con.read(self.con.in_waiting) - data = data.decode() - - pattern = "^\[" + username + "@.+\]# $" - pattern = re.compile(pattern, re.MULTILINE) - if pattern.match(data, re.MULTILINE): - print("It works") - - print(data, end='') ''' - - - #@property - #def con(self): - # return self.con - - - def read(self, size=1): - if len(self.buffer) >= size: - # throw away first size bytes in buffer - data = self.buffer[:size] - # Set the buffer to the non used bytes - self.buffer = self.buffer[size:] - return data - else: - data = self.buffer - # Set the size to the value we have to read now - size = size - len(self.buffer) - # Set the buffer empty - self.buffer = b"" - return data + self.con.read(size) - - def peek(self, size=1): - if len(self.buffer) <= size: - self.buffer += self.con.read(size=size - len(self.buffer)) - - return self.buffer[:size] - - def readline(self): - self.log.debug(self.buffer) - self.buffer = self.buffer + self.con.read(self.con.in_waiting) - if b"\n" in self.buffer: - size = self.buffer.index(b"\n") + 1 - self.log.debug("We have a whole line in the buffer") - self.log.debug(self.buffer) - self.log.debug("We split at {}".format(size)) - data = self.buffer[:size] - self.buffer = self.buffer[size:] - self.log.debug(data) - self.log.debug(self.buffer) - return data - - data = self.buffer - self.buffer = b"" - return data + self.con.readline() - - def back_at_prompt(self): - data = self.peek() - if not data == b"[": - return False - - # We need to use self.in_waiting because with self.con.in_waiting we get - # not the complete string - size = len(self.buffer) + self.in_waiting - data = self.peek(size) - - - if self.back_at_prompt_pattern == None: - #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username) - self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE) - - if self.back_at_prompt_pattern.search(data.decode()): - return True - else: - return False - - def log_console_line(self, line): - self.log.debug("Get in function log_console_line()") - sys.stdout.write(line) - - @property - def in_waiting(self): - in_waiting_before = 0 - sleep(0.5) - - while in_waiting_before != self.con.in_waiting: - in_waiting_before = self.con.in_waiting - sleep(0.5) - - return self.con.in_waiting - - - def readline2(self, pattern=None): - string = "" - string2 = b"" - if pattern: - pattern = re.compile(pattern) - - while 1: - char = self.con.read(1) - string = string + char.decode("utf-8") - string2 = string2 + char - #print(char) - print(char.decode("utf-8"), end="") - - #print(string2) - if pattern and pattern.match(string): - #print("get here1") - #print(string2) - return {"string" : string, "return-code" : 1} - - if char == b"\n": - #print(char) - #print(string2) - #print("get here2") - return {"return-code" : 0} - - def check_logged_in(self, username): - pattern = "^\[" + username + "@.+\]#" - data = self.readline(pattern=pattern) - if data["return-code"] == 1: - print("We are logged in") - return True - else: - print("We are not logged in") - return False - - def login(self, password): - if self.username == None: - self.log.error("Username cannot be blank") - return False - - # Hit enter to see what we get - self.con.write(b'\n') - # We get two new lines \r\n ? - data = self.readline() - self.log_console_line(data.decode()) - - - if self.back_at_prompt(): - self.log.debug("We are already logged in.") - return True - - # Read all line till we get login: - while 1: - data = self.peek() - if not data.decode() == "l": - self.log.debug("We get no l at the start") - self.log_console_line(self.readline().decode()) - - # We need to use self.in_waiting because with self.con.in_waiting we get - # not the complete string - size = len(self.buffer) + self.in_waiting - data = self.peek(size) - - pattern = r"^.*login: " - pattern = re.compile(pattern) - - if pattern.search(data.decode()): - break - else: - self.log.debug("The pattern does not match") - self.log_console_line(self.readline().decode()) - - # We can login - string = "{}\n".format(self.username) - self.con.write(string.encode()) - self.con.flush() - # read the login out of the buffer - data = self.readline() - self.log.debug("This is the login:{}".format(data)) - self.log_console_line(data.decode()) - - # We need to wait her till we get the full string "Password:" - #This is useless but self.in_waiting will wait the correct amount of time - size = self.in_waiting - - string = "{}\n".format(password) - self.con.write(string.encode()) - self.con.flush() - # Print the 'Password:' line - data = self.readline() - self.log_console_line(data.decode()) - - while not self.back_at_prompt(): - # This will fail if the login failed so we need to look for the failed keyword - data = self.readline() - self.log_console_line(data.decode()) - - return True - - def write(self, string): - self.log.debug(string) - self.con.write(string.encode()) - self.con.flush() - - def command(self, command): - self.write("{}\n".format(command)) - - # We need to read out the prompt for this command first - # If we do not do this we will break the loop immediately - # because the prompt for this command is still in the buffer - data = self.readline() - self.log_console_line(data.decode()) - - while not self.back_at_prompt(): - data = self.readline() - self.log_console_line(data.decode()) - - -# while True: - -# line = self.readline() - -# print (line) - -# print("Hello") - -# print("World") - -# Hello -# World - -# Hello World - -# # Peek for prompt? -# if self.back_at_prompt(): -# break - -# def back_at_prompt(): -# data = self.peek() - -# if not char == "[": -# return False - -# data = self.peek(in_waiting) -# m = re.search(..., data) -# if m: -# return True - - - - - -# pattern = r"^\[root@.+\]#" - -# pattern = re.compile(pattern, re.MULTILINE) - -# data = """cd / && ls -# bin dev home lib64 media opt root sbin sys usr -# boot etc lib lost+found mnt proc run srv tmp var -# [root@localhost /]# """ - -# #data = "[root@localhost /]# " -# data2 = pattern.search(data) - -# #if pattern.search(data): -# # print("get here") - -# #print(data2.group()) - - - -# vm = vm("qemu:///system") - -# dom = vm.domain_start_from_xml_file("/home/jonatan/python-testing-kvm/centos2.xml") - -# # This block till the vm is booted -# print("Waiting till the domain is booted up") -# vm.check_domain_is_booted_up(dom) -# print("Domain is booted up") -# #vm.domain_get_serial_device(dom) -# #vm.domain_get_serial_device(dom) - -# #centos2 = vm("/home/jonatan/python-testing-kvm/centos2.xml", "/home/jonatan/python-testing-kvm/centos2-snapshot.xml") - -# centos2.define() -# centos2.create_snapshot() -# centos2.start() -# #centos2.check_is_booted_up() -# centos2.login("root", "25814@root") -# centos2.cmd("echo 1 > test3") - - -# #versuch1 = connection(centos2.get_serial_device, username="root") -# #versuch1.login() -# #versuch1.command("cd / && ls") - -# input("Press Enter to continue...") -# centos2.shutdown() -# centos2.revert_snapshot() -# centos2.undefine() - -# A class which define and undefine a virtual network based on an xml file -class network(): - def __init__(self, path): - self.log = log(4) - self.log.debug("Path is: {}".format(path)) - -# Should read the test, check if the syntax are valid -# and return tuples with the ( host, command ) structure -class recipe(): - def __init__(self, path): - self.log = log(4) - self.recipe_file = path - if not os.path.isfile(self.recipe_file): - self.log.error("No such file: {}".format(self.recipe_file)) - - try: - with open(self.recipe_file) as fobj: - self.raw_recipe = fobj.readlines() - except FileNotFoundError as error: - self.log.error("No such file: {}".format(vm_xml_file)) - - for line in self.raw_recipe: - print(line) +from virtual_environ import virtual_environ +from recipe import recipe +import logging +logger = logging.getLogger("nitsi.test") class test(): def __init__(self, path): - self.log = log(4) try: self.path = os.path.abspath(path) + self.log = logger.getChild(os.path.basename(self.path)) except BaseException as e: - self.log.error("Could not get absolute path") + logger.error("Could not get absolute path") self.log.debug(self.path) @@ -709,6 +37,17 @@ class test(): self.config.read(self.settings_file) self.name = self.config["DEFAULT"]["Name"] self.description = self.config["DEFAULT"]["Description"] + self.copy_to = self.config["DEFAULT"]["Copy_to"] + self.copy_from = self.config["DEFAULT"]["Copy_from"] + self.copy_from = self.copy_from.split(",") + + tmp = [] + for file in self.copy_from: + file = file.strip() + file = os.path.normpath(self.path + "/" + file) + tmp.append(file) + + self.copy_from = tmp self.virtual_environ_name = self.config["VIRTUAL_ENVIRONMENT"]["Name"] self.virtual_environ_path = self.config["VIRTUAL_ENVIRONMENT"]["Path"] @@ -722,147 +61,51 @@ class test(): self.virtual_machines = self.virtual_environ.get_machines() def virtual_environ_start(self): - pass - - def load_recipe(self): - pass - - def run_recipe(): - pass + for name in self.virtual_environ.network_names: + self.virtual_networks[name].define() + self.virtual_networks[name].start() - def virtual_environ_stop(): - pass + for name in self.virtual_environ.machine_names: + self.virtual_machines[name].define() + self.virtual_machines[name].create_snapshot() + self.virtual_machines[name].copy_in(self.copy_from, self.copy_to) + self.virtual_machines[name].start() + self.log.debug("Try to login on all machines") + for name in self.virtual_environ.machine_names: + self.log.debug("Try to login on {}".format(name)) + self.virtual_machines[name].login() - - -# Should return all vms and networks in a list -# and should provide the path to the necessary xml files -class virtual_environ(): - def __init__(self, path): - self.log = log(4) + def load_recipe(self): try: - self.path = os.path.abspath(path) + self.recipe = recipe(self.recipe_file) + for line in self.recipe.recipe: + self.log.debug(line) except BaseException as e: - self.log.error("Could not get absolute path") - - self.log.debug(self.path) - - self.settings_file = "{}/settings".format(self.path) - if not os.path.isfile(self.settings_file): - self.log.error("No such file: {}".format(self.settings_file)) - - self.log.debug(self.settings_file) - self.config = configparser.ConfigParser() - self.config.read(self.settings_file) - self.name = self.config["DEFAULT"]["name"] - self.machines_string = self.config["DEFAULT"]["machines"] - self.networks_string = self.config["DEFAULT"]["networks"] - - self.machines = [] - for machine in self.machines_string.split(","): - self.machines.append(machine.strip()) - - self.networks = [] - for network in self.networks_string.split(","): - self.networks.append(network.strip()) - - self.log.debug(self.machines) - self.log.debug(self.networks) - - def get_networks(self): - networks = {} - for _network in self.networks: - self.log.debug(_network) - networks.setdefault(_network, network(os.path.normpath(self.path + "/" + self.config[_network]["xml_file"]))) - return networks - - def get_machines(self): - machines = {} - for _machine in self.machines: - self.log.debug(_machine) - machines.setdefault(_machine, vm( - os.path.normpath(self.path + "/" + self.config[_machine]["xml_file"]), - os.path.normpath(self.path + "/" + self.config[_machine]["snapshot_xml_file"]))) - - return machines - - - - - -# def command(self, command): -# self._send_command(command) - -# while True: -# line = self.readline() - -# print (line) - -# print("Hello") - -# print("World") - -# Hello -# World - -# Hello World - -# # Peek for prompt? -# if self.back_at_prompt(): -# break - -# def back_at_prompt(): -# data = self.peek() - -# if not char == "[": -# return False - -# data = self.peek(in_waiting) -# m = re.search(..., data) -# if m: -# return True - - - - - -# class connection() -# buffer = b"" - -# def read(self, size=1): -# if len(buffer) >= size: -# # throw away first size bytes in buffer -# data, buffer = buffer[:size], buffer[size:] -# return data - -# return self.serial.read(size) - -# def peek(self, size=1): -# if len(buffer) <= size: -# buffer += self.serial.read(size=size - len(buffer)) - -# return buffer[:size] - - -# def readline(self): -# buffer = buffer + self.serial.read(in_wating) -# if "\n" in buffer: -# return alle zeichen bis zum \n - -# return buffer + self.serial.readline() - + self.log.error("Failed to load recipe") + raise e + + def run_recipe(self): + for line in self.recipe.recipe: + return_value = self.virtual_machines[line[0]].cmd(line[2]) + self.log.debug("Return value is: {}".format(return_value)) + if return_value != "0" and line[1] == "": + self.log.error("Failed to execute command '{}' on {}, return code: {}".format(line[2],line[0], return_value)) + return False + elif return_value == "0" and line[1] == "!": + self.log.error("Succeded to execute command '{}' on {}, return code: {}".format(line[2],line[0],return_value)) + return False + else: + self.log.debug("Command '{}' on {} returned with: {}".format(line[2],line[0],return_value)) -if __name__ == "__main__": - import argparse + def virtual_environ_stop(self): + for name in self.virtual_environ.machine_names: + self.virtual_machines[name].shutdown() + self.virtual_machines[name].revert_snapshot() + self.virtual_machines[name].undefine() - parser = argparse.ArgumentParser() + for name in self.virtual_environ.network_names: + self.virtual_networks[name].undefine() - parser.add_argument("-d", "--directory", dest="dir") - args = parser.parse_args() - _recipe = recipe("/home/jonatan/python-testing-kvm/test/recipe") - currenttest = test(args.dir) - currenttest.read_settings() - currenttest.virtual_environ_setup() \ No newline at end of file