import xml.etree.ElementTree as ET
-import inspect
import os
import configparser
class vm():
- def __init__(self, vm_xml_file, snapshot_xml_file, image, root_uid):
+ def __init__(self, vm_xml_file, snapshot_xml_file, image, root_uid, username, password):
self.log = log(4)
self.con = libvirt_con("qemu:///system")
try:
self.image = image
if not os.path.isfile(self.image):
- self.log.error("No such file: {}".format(self.settings_file))
+ self.log.error("No such file: {}".format(self.image))
self.root_uid = root_uid
+ self.username = username
+ self.password = password
+
def define(self):
self.dom = self.con.con.defineXML(self.vm_xml)
if self.dom == None:
raise BaseException
def revert_snapshot(self):
- print(inspect.getmembers(self.dom, predicate=inspect.ismethod))
self.dom.revertToSnapshot(self.snapshot)
- #self.dom.SnapshotDelete(self.snapshot)
+ self.snapshot.delete()
def is_running(self):
#serial_con.close()
- def login(self, username, password):
+ def login(self):
try:
- self.serial_con = connection(self.get_serial_device(), username="root")
- self.serial_con.login("25814@root")
+ self.serial_con = connection(self.get_serial_device(), username=self.username)
+ self.serial_con.login(self.password)
except BaseException as e:
self.log.error("Could not connect to the domain via serial console")
return self.serial_con.command(cmd)
-
-
-
-# try:
-# dom = conn.lookupByUUIDString(uuid)
-# except:
-# flash(u"Failed to get the domain object", 'alert-danger')
-# print('Failed to get the domain object', file=sys.stderr)
-# conn.close()
-# return redirect("/vm")
-
-# domname = dom.name()
-# if action == "start":
-# try:
-# dom.create()
-# except:
-# flash(u"Can not boot guest domain.", 'alert-danger')
-# print('Can not boot guest domain.', file=sys.stderr)
-# conn.close()
-# return redirect("/vm")
-
-# flash(u"Sucessfully started Domain \"{}\"".format(domname), 'alert-info')
-# conn.close()
-# return redirect("/vm")
-
-# elif action == "shutdown":
-# try:
-# dom.shutdown()
-# except:
-# flash(u"Can not shutdown guest domain.", 'alert-danger')
-# print('Can not shutdown guest domain.', file=sys.stderr)
-# conn.close()
-# return redirect("/vm")
-
-# flash(u"Sucessfully shutdowned Domain \"{}\"".format(domname), 'alert-info')
-# conn.close()
-# return redirect("/vm")
-
-# elif action == "destroy":
-# try:
-# dom.destroy()
-# except:
-# flash(u"Can not destroy guest domain.", 'alert-danger')
-# print('Can not destroy guest domain.', file=sys.stderr)
-# conn.close()
-# return redirect("/vm")
-
-# flash(u"Sucessfully destroyed Domain \"{}\"".format(domname), 'alert-info')
-# conn.close()
-# return redirect("/vm")
-
-# elif action == "pause":
-# try:
-# dom.suspend()
-# except:
-# flash(u"Can not pause guest domain.", 'alert-danger')
-# print('Can not pause guest domain.', file=sys.stderr)
-# conn.close()
-# return redirect("/vm")
-
-# flash(u"Sucessfully paused Domain \"{}\"".format(domname), 'alert-info')
-# conn.close()
-# return redirect("/vm")
-
-# elif action == "resume":
-# try:
-# dom.resume()
-# except:
-# flash(u"Can not eesume guest domain.:", 'alert-danger')
-# print('Can not resume guest domain.', file=sys.stderr)
-# conn.close()
-# return redirect("/vm")
-
-# flash(u"Sucessfully resumed Domain \"{}\"".format(domname), 'alert-info')
-# conn.close()
-# return redirect("/vm")
-
-# else:
-# flash(u"No such action: \"{}\"".format(action), 'alert-warning')
-# conn.close()
-# return redirect("/vm")
-
-
-# @vms.route('/vm')
-# @login_required
-# def vm_overview():
-# import sys
-# import libvirt
-# conn = libvirt.open('qemu:///system')
-# doms = conn.listAllDomains(0)
-# domains = []
-
-# if len(doms) != 0:
-# for dom in doms:
-# domain = {}
-# domain.setdefault("name", dom.name())
-# domain.setdefault("uuid", dom.UUIDString())
-# state, reason = dom.state()
-# domain.setdefault("state", dom_state(state))
-# domains.append(domain)
-
-# conn.close()
-
-# return render_template("virtberry_vm_basic-vm.html", domains=domains)
-
-
-
class connection():
def __init__(self, device, username=None):
self.buffer = b""
self.username = username
self.log = log(1)
self.con = serial.Serial(device)
- # # Just press enter one time to see what we get
- # self.con.write(b'\n')
- # # We get two new lines \r\n ?
- # data = self.readline()
- # self.log_console_line(data.decode())
-
-
- # if not self.back_at_prompt():
- # self.log.debug("We need to login")
- # if not self.login(password):
- # self.log.error("Login failed")
- # return False
- # else:
- # self.log.debug("We are logged in")
-
-
- ''' in_waiting_before = 0
- sleep(1)
-
- while in_waiting_before != self.con.in_waiting:
- in_waiting_before = self.con.in_waiting
- sleep(0.5)
-
- print(self.con.in_waiting)
- data = self.con.read(self.con.in_waiting)
- print(data)
- print(data.decode(),end='')
-
- string = 'root\n'
- self.con.write(string.encode())
- self.con.flush() '''
-
- ''' in_waiting_before = 0
- sleep(1)
-
- while in_waiting_before != self.con.in_waiting:
- in_waiting_before = self.con.in_waiting
- sleep(0.5)
-
- print(self.con.in_waiting)
- data = self.con.read(self.con.in_waiting)
- print(data)
- print(data.decode(), end='')
-
- string = '25814@root\n'
- self.con.write(string.encode())
- self.con.flush()
-
- in_waiting_before = 0
- sleep(1)
-
- while in_waiting_before != self.con.in_waiting:
- in_waiting_before = self.con.in_waiting
- sleep(0.5)
-
- print(self.con.in_waiting)
- data = self.con.read(self.con.in_waiting)
- print(data)
- print(data.decode(), end='') '''
-
- # check if we already logged in
- # If we we get something like [root@localhost ~]#
- #self.readline()
-
- # if not self.check_logged_in(username):
- #print("Try to login")
- #if self.login(username, password):
- # print("Could not login")
- # return False
-
- #pattern = "^\[" + username + "@.+\]#"
- #print(pattern)
- #data = self.readline(pattern=pattern)
- #if data["return-code"] == 1:
- # print("We are logged in")
- # else:
- # print("We are not logged in")
- # login
-
- #while 1:
- #data = self.readline("^.*login:")
- # if data["return-code"] == 1:
- # break
-
- # string = 'cd / && ls \n'
- # self.con.write(string.encode())
- # self.con.flush()
- # #print(self.con.read(5))
-
- # data = self.readline()
- # self.log_console_line(data.decode())
-
- # while not self.back_at_prompt():
- # data = self.readline()
- # self.log_console_line(data.decode())
-
- ''' in_waiting_before = 0
- sleep(1)
-
- while in_waiting_before != self.con.in_waiting:
- in_waiting_before = self.con.in_waiting
- sleep(0.5)
-
- print(self.con.in_waiting)
- data = self.con.read(self.con.in_waiting)
- data = data.decode()
-
- pattern = "^\[" + username + "@.+\]# $"
- pattern = re.compile(pattern, re.MULTILINE)
- if pattern.match(data, re.MULTILINE):
- print("It works")
-
- print(data, end='') '''
-
-
- #@property
- #def con(self):
- # return self.con
-
def read(self, size=1):
if len(self.buffer) >= size:
return self.con.in_waiting
+ def line_in_buffer(self):
+ if b"\n" in self.buffer:
+ return True
+
+ return False
def readline2(self, pattern=None):
string = ""
print("We are not logged in")
return False
+ def print_lines_in_buffer(self):
+ while True:
+ self.log.debug("Fill buffer ...")
+ self.peek(len(self.buffer) + self.in_waiting)
+ self.log.debug("Current buffer length: {}".format(len(self.buffer)))
+ if self.line_in_buffer() == True:
+ while self.line_in_buffer() == True:
+ data = self.readline()
+ self.log_console_line(data.decode())
+ else:
+ self.log.debug("We have printed all lines in the buffer")
+ break
+
def login(self, password):
if self.username == None:
self.log.error("Username cannot be blank")
return False
+ self.print_lines_in_buffer()
+
# Hit enter to see what we get
self.con.write(b'\n')
# We get two new lines \r\n ?
data = self.readline()
self.log_console_line(data.decode())
+ self.print_lines_in_buffer()
if self.back_at_prompt():
self.log.debug("We are already logged in.")
self.log_console_line(data.decode())
-# while True:
-
-# line = self.readline()
-
-# print (line)
-
-# print("Hello")
-
-# print("World")
-
-# Hello
-# World
-
-# Hello World
-
-# # Peek for prompt?
-# if self.back_at_prompt():
-# break
-
-# def back_at_prompt():
-# data = self.peek()
-
-# if not char == "[":
-# return False
-
-# data = self.peek(in_waiting)
-# m = re.search(..., data)
-# if m:
-# return True
-
-
-
-
-
-# pattern = r"^\[root@.+\]#"
-
-# pattern = re.compile(pattern, re.MULTILINE)
-
-# data = """cd / && ls
-# bin dev home lib64 media opt root sbin sys usr
-# boot etc lib lost+found mnt proc run srv tmp var
-# [root@localhost /]# """
-
-# #data = "[root@localhost /]# "
-# data2 = pattern.search(data)
-
-# #if pattern.search(data):
-# # print("get here")
-
-# #print(data2.group())
-
-
+# A class which define and undefine a virtual network based on an xml file
+class network():
+ def __init__(self, network_xml_file):
+ self.log = log(4)
+ self.con = libvirt_con("qemu:///system")
+ try:
+ with open(network_xml_file) as fobj:
+ self.network_xml = fobj.read()
+ except FileNotFoundError as error:
+ self.log.error("No such file: {}".format(vm_xml_file))
-# vm = vm("qemu:///system")
+ def define(self):
+ self.network = self.con.con.networkDefineXML(self.network_xml)
-# dom = vm.domain_start_from_xml_file("/home/jonatan/python-testing-kvm/centos2.xml")
+ if network == None:
+ self.log.error("Failed to define virtual network")
-# # This block till the vm is booted
-# print("Waiting till the domain is booted up")
-# vm.check_domain_is_booted_up(dom)
-# print("Domain is booted up")
-# #vm.domain_get_serial_device(dom)
-# #vm.domain_get_serial_device(dom)
+ def start(self):
+ self.network.create()
-# #centos2 = vm("/home/jonatan/python-testing-kvm/centos2.xml", "/home/jonatan/python-testing-kvm/centos2-snapshot.xml")
+ def undefine(self):
+ self.network.destroy()
-# centos2.define()
-# centos2.create_snapshot()
-# centos2.start()
-# #centos2.check_is_booted_up()
-# centos2.login("root", "25814@root")
-# centos2.cmd("echo 1 > test3")
-# #versuch1 = connection(centos2.get_serial_device, username="root")
-# #versuch1.login()
-# #versuch1.command("cd / && ls")
+class RecipeExeption(Exception):
+ pass
-# input("Press Enter to continue...")
-# centos2.shutdown()
-# centos2.revert_snapshot()
-# centos2.undefine()
-# A class which define and undefine a virtual network based on an xml file
-class network():
- def __init__(self, path):
- self.log = log(4)
- self.log.debug("Path is: {}".format(path))
# Should read the test, check if the syntax are valid
# and return tuples with the ( host, command ) structure
def __init__(self, path):
self.log = log(4)
self.recipe_file = path
+ self._recipe = None
+
if not os.path.isfile(self.recipe_file):
self.log.error("No such file: {}".format(self.recipe_file))
except FileNotFoundError as error:
self.log.error("No such file: {}".format(vm_xml_file))
- for line in self.raw_recipe:
- print(line)
+ @property
+ def recipe(self):
+ if not self._recipe:
+ self.parse()
+
+ return self._recipe
+ def parse(self):
+ self._recipe = []
+ i = 1
+ for line in self.raw_recipe:
+ raw_line = line.split(":")
+ if len(raw_line) < 2:
+ self.log.error("Error parsing the recipe in line {}".format(i))
+ raise RecipeExeption
+ cmd = raw_line[1]
+ raw_line = raw_line[0].strip().split(" ")
+ if len(raw_line) == 0:
+ self.log.error("Failed to parse the recipe in line {}".format(i))
+ raise RecipeExeption
+ elif len(raw_line) == 1:
+ if raw_line[0] == "":
+ self.log.error("Failed to parse the recipe in line {}".format(i))
+ raise RecipeExeption
+ machine = raw_line[0]
+ extra = ""
+ elif len(raw_line) == 2:
+ machine = raw_line[0]
+ extra = raw_line[1]
+
+ self._recipe.append((machine.strip(), extra.strip(), cmd.strip()))
+ i = i + 1
class test():
self.virtual_machines = self.virtual_environ.get_machines()
def virtual_environ_start(self):
- pass
+ for name in self.virtual_environ.network_names:
+ self.virtual_networks[name].define()
+ self.virtual_networks[name].start()
- def load_recipe(self):
- pass
-
- def run_recipe():
- pass
-
- def virtual_environ_stop():
- pass
+ for name in self.virtual_environ.machine_names:
+ self.virtual_machines[name].define()
+ self.virtual_machines[name].create_snapshot()
+ self.virtual_machines[name].start()
+ self.log.debug("Try to login on all machines")
+ for name in self.virtual_environ.machine_names:
+ self.virtual_machines[name].login()
+ def load_recipe(self):
+ try:
+ self.recipe = recipe(self.recipe_file)
+ except BaseException:
+ self.log.error("Failed to load recipe")
+ exit(1)
+
+ def run_recipe(self):
+ for line in self.recipe.recipe:
+ return_value = self.virtual_machines[line[0]].cmd(line[2])
+ if not return_value and line[1] == "":
+ self.log.error("Failed to execute command '{}' on {}".format(line[2],line[0]))
+ return False
+ elif return_value == True and line[1] == "!":
+ self.log.error("Succeded to execute command '{}' on {}".format(line[2],line[0]))
+ return False
+
+ def virtual_environ_stop(self):
+ for name in self.virtual_environ.machine_names:
+ self.virtual_machines[name].shutdown()
+ self.virtual_machines[name].revert_snapshot()
+ self.virtual_machines[name].undefine()
+
+ for name in self.virtual_environ.network_names:
+ self.virtual_networks[name].undefine()
# Should return all vms and networks in a list
self.log.debug(_machine)
machines.setdefault(_machine, vm(
os.path.normpath(self.path + "/" + self.config[_machine]["xml_file"]),
- os.path.normpath(self.path + "/" + self.config[_machine]["snapshot_xml_file"])))
+ os.path.normpath(self.path + "/" + self.config[_machine]["snapshot_xml_file"]),
+ self.config[_machine]["image"],
+ self.config[_machine]["root_uid"],
+ self.config[_machine]["username"],
+ self.config[_machine]["password"]))
return machines
+ @property
+ def machine_names(self):
+ return self.machines
-
-
-
-# def command(self, command):
-# self._send_command(command)
-
-# while True:
-# line = self.readline()
-
-# print (line)
-
-# print("Hello")
-
-# print("World")
-
-# Hello
-# World
-
-# Hello World
-
-# # Peek for prompt?
-# if self.back_at_prompt():
-# break
-
-# def back_at_prompt():
-# data = self.peek()
-
-# if not char == "[":
-# return False
-
-# data = self.peek(in_waiting)
-# m = re.search(..., data)
-# if m:
-# return True
-
-
-
-
-
-# class connection()
-# buffer = b""
-
-# def read(self, size=1):
-# if len(buffer) >= size:
-# # throw away first size bytes in buffer
-# data, buffer = buffer[:size], buffer[size:]
-# return data
-
-# return self.serial.read(size)
-
-# def peek(self, size=1):
-# if len(buffer) <= size:
-# buffer += self.serial.read(size=size - len(buffer))
-
-# return buffer[:size]
-
-
-# def readline(self):
-# buffer = buffer + self.serial.read(in_wating)
-# if "\n" in buffer:
-# return alle zeichen bis zum \n
-
-# return buffer + self.serial.readline()
+ @property
+ def network_names(self):
+ return self.networks
if __name__ == "__main__":
args = parser.parse_args()
- _recipe = recipe("/home/jonatan/python-testing-kvm/test/recipe")
currenttest = test(args.dir)
currenttest.read_settings()
- currenttest.virtual_environ_setup()
\ No newline at end of file
+ currenttest.virtual_environ_setup()
+ currenttest.load_recipe()
+ currenttest.virtual_environ_start()
+ currenttest.run_recipe()
+ currenttest.virtual_environ_stop()
\ No newline at end of file