]>
git.ipfire.org Git - nitsi.git/blob - test.py
11 import xml
.etree
.ElementTree
as ET
19 def __init__(self
, log_level
):
20 self
.log_level
= log_level
22 def debug(self
, string
):
23 if self
.log_level
>= 4:
24 print("DEBUG: {}".format(string
))
26 def error(self
, string
):
27 print("ERROR: {}".format(string
))
30 def __init__(self
, uri
):
33 self
.connection
= None
35 def get_domain_from_name(self
, name
):
36 dom
= self
.con
.lookupByName(name
)
44 if self
.connection
== None:
46 self
.connection
= libvirt
.open(self
.uri
)
47 except BaseException
as error
:
48 self
.log
.error("Could not connect to: {}".format(self
.uri
))
50 self
.log
.debug("Connected to: {}".format(self
.uri
))
51 return self
.connection
53 return self
.connection
57 def __init__(self
, vm_xml_file
, snapshot_xml_file
, image
, root_uid
):
59 self
.con
= libvirt_con("qemu:///system")
61 with
open(vm_xml_file
) as fobj
:
62 self
.vm_xml
= fobj
.read()
63 except FileNotFoundError
as error
:
64 self
.log
.error("No such file: {}".format(vm_xml_file
))
67 with
open(snapshot_xml_file
) as fobj
:
68 self
.snapshot_xml
= fobj
.read()
69 except FileNotFoundError
as error
:
70 self
.log
.error("No such file: {}".format(snapshot_xml_file
))
74 if not os
.path
.isfile(self
.image
):
75 self
.log
.error("No such file: {}".format(self
.settings_file
))
77 self
.root_uid
= root_uid
80 self
.dom
= self
.con
.con
.defineXML(self
.vm_xml
)
82 self
.log
.error("Could not define VM")
86 if self
.dom
.create() < 0:
87 self
.log
.error("Could not start VM")
92 if self
.dom
.shutdown() < 0:
93 self
.log
.error("Could not shutdown VM")
96 self
.log
.error("Domain is not running")
101 def create_snapshot(self
):
103 self
.snapshot
= self
.dom
.snapshotCreateXML(self
.snapshot_xml
)
105 if not self
.snapshot
:
106 self
.log
.error("Could not create snapshot")
109 def revert_snapshot(self
):
110 print(inspect
.getmembers(self
.dom
, predicate
=inspect
.ismethod
))
111 self
.dom
.revertToSnapshot(self
.snapshot
)
112 #self.dom.SnapshotDelete(self.snapshot)
114 def is_running(self
):
116 state
, reason
= self
.dom
.state()
118 if state
== libvirt
.VIR_DOMAIN_RUNNING
:
123 def get_serial_device(self
):
125 if not self
.is_running():
128 xml_root
= ET
.fromstring(self
.dom
.XMLDesc(0))
130 elem
= xml_root
.find("./devices/serial/source")
131 return elem
.get("path")
133 def check_is_booted_up(self
):
134 serial_con
= connection(self
.get_serial_device())
136 serial_con
.write("\n")
137 # This will block till the domain is booted up
142 def login(self
, username
, password
):
144 self
.serial_con
= connection(self
.get_serial_device(), username
="root")
145 self
.serial_con
.login("25814@root")
146 except BaseException
as e
:
147 self
.log
.error("Could not connect to the domain via serial console")
150 return self
.serial_con
.command(cmd
)
157 # dom = conn.lookupByUUIDString(uuid)
159 # flash(u"Failed to get the domain object", 'alert-danger')
160 # print('Failed to get the domain object', file=sys.stderr)
162 # return redirect("/vm")
164 # domname = dom.name()
165 # if action == "start":
169 # flash(u"Can not boot guest domain.", 'alert-danger')
170 # print('Can not boot guest domain.', file=sys.stderr)
172 # return redirect("/vm")
174 # flash(u"Sucessfully started Domain \"{}\"".format(domname), 'alert-info')
176 # return redirect("/vm")
178 # elif action == "shutdown":
182 # flash(u"Can not shutdown guest domain.", 'alert-danger')
183 # print('Can not shutdown guest domain.', file=sys.stderr)
185 # return redirect("/vm")
187 # flash(u"Sucessfully shutdowned Domain \"{}\"".format(domname), 'alert-info')
189 # return redirect("/vm")
191 # elif action == "destroy":
195 # flash(u"Can not destroy guest domain.", 'alert-danger')
196 # print('Can not destroy guest domain.', file=sys.stderr)
198 # return redirect("/vm")
200 # flash(u"Sucessfully destroyed Domain \"{}\"".format(domname), 'alert-info')
202 # return redirect("/vm")
204 # elif action == "pause":
208 # flash(u"Can not pause guest domain.", 'alert-danger')
209 # print('Can not pause guest domain.', file=sys.stderr)
211 # return redirect("/vm")
213 # flash(u"Sucessfully paused Domain \"{}\"".format(domname), 'alert-info')
215 # return redirect("/vm")
217 # elif action == "resume":
221 # flash(u"Can not eesume guest domain.:", 'alert-danger')
222 # print('Can not resume guest domain.', file=sys.stderr)
224 # return redirect("/vm")
226 # flash(u"Sucessfully resumed Domain \"{}\"".format(domname), 'alert-info')
228 # return redirect("/vm")
231 # flash(u"No such action: \"{}\"".format(action), 'alert-warning')
233 # return redirect("/vm")
241 # conn = libvirt.open('qemu:///system')
242 # doms = conn.listAllDomains(0)
248 # domain.setdefault("name", dom.name())
249 # domain.setdefault("uuid", dom.UUIDString())
250 # state, reason = dom.state()
251 # domain.setdefault("state", dom_state(state))
252 # domains.append(domain)
256 # return render_template("virtberry_vm_basic-vm.html", domains=domains)
261 def __init__(self
, device
, username
=None):
263 self
.back_at_prompt_pattern
= None
264 self
.username
= username
266 self
.con
= serial
.Serial(device
)
267 # # Just press enter one time to see what we get
268 # self.con.write(b'\n')
269 # # We get two new lines \r\n ?
270 # data = self.readline()
271 # self.log_console_line(data.decode())
274 # if not self.back_at_prompt():
275 # self.log.debug("We need to login")
276 # if not self.login(password):
277 # self.log.error("Login failed")
280 # self.log.debug("We are logged in")
283 ''' in_waiting_before = 0
286 while in_waiting_before != self.con.in_waiting:
287 in_waiting_before = self.con.in_waiting
290 print(self.con.in_waiting)
291 data = self.con.read(self.con.in_waiting)
293 print(data.decode(),end='')
296 self.con.write(string.encode())
299 ''' in_waiting_before = 0
302 while in_waiting_before != self.con.in_waiting:
303 in_waiting_before = self.con.in_waiting
306 print(self.con.in_waiting)
307 data = self.con.read(self.con.in_waiting)
309 print(data.decode(), end='')
311 string = '25814@root\n'
312 self.con.write(string.encode())
315 in_waiting_before = 0
318 while in_waiting_before != self.con.in_waiting:
319 in_waiting_before = self.con.in_waiting
322 print(self.con.in_waiting)
323 data = self.con.read(self.con.in_waiting)
325 print(data.decode(), end='') '''
327 # check if we already logged in
328 # If we we get something like [root@localhost ~]#
331 # if not self.check_logged_in(username):
332 #print("Try to login")
333 #if self.login(username, password):
334 # print("Could not login")
337 #pattern = "^\[" + username + "@.+\]#"
339 #data = self.readline(pattern=pattern)
340 #if data["return-code"] == 1:
341 # print("We are logged in")
343 # print("We are not logged in")
347 #data = self.readline("^.*login:")
348 # if data["return-code"] == 1:
351 # string = 'cd / && ls \n'
352 # self.con.write(string.encode())
354 # #print(self.con.read(5))
356 # data = self.readline()
357 # self.log_console_line(data.decode())
359 # while not self.back_at_prompt():
360 # data = self.readline()
361 # self.log_console_line(data.decode())
363 ''' in_waiting_before = 0
366 while in_waiting_before != self.con.in_waiting:
367 in_waiting_before = self.con.in_waiting
370 print(self.con.in_waiting)
371 data = self.con.read(self.con.in_waiting)
374 pattern = "^\[" + username + "@.+\]# $"
375 pattern = re.compile(pattern, re.MULTILINE)
376 if pattern.match(data, re.MULTILINE):
379 print(data, end='') '''
387 def read(self
, size
=1):
388 if len(self
.buffer) >= size
:
389 # throw away first size bytes in buffer
390 data
= self
.buffer[:size
]
391 # Set the buffer to the non used bytes
392 self
.buffer = self
.buffer[size
:]
396 # Set the size to the value we have to read now
397 size
= size
- len(self
.buffer)
398 # Set the buffer empty
400 return data
+ self
.con
.read(size
)
402 def peek(self
, size
=1):
403 if len(self
.buffer) <= size
:
404 self
.buffer += self
.con
.read(size
=size
- len(self
.buffer))
406 return self
.buffer[:size
]
409 self
.log
.debug(self
.buffer)
410 self
.buffer = self
.buffer + self
.con
.read(self
.con
.in_waiting
)
411 if b
"\n" in self
.buffer:
412 size
= self
.buffer.index(b
"\n") + 1
413 self
.log
.debug("We have a whole line in the buffer")
414 self
.log
.debug(self
.buffer)
415 self
.log
.debug("We split at {}".format(size
))
416 data
= self
.buffer[:size
]
417 self
.buffer = self
.buffer[size
:]
419 self
.log
.debug(self
.buffer)
424 return data
+ self
.con
.readline()
426 def back_at_prompt(self
):
431 # We need to use self.in_waiting because with self.con.in_waiting we get
432 # not the complete string
433 size
= len(self
.buffer) + self
.in_waiting
434 data
= self
.peek(size
)
437 if self
.back_at_prompt_pattern
== None:
438 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
439 self
.back_at_prompt_pattern
= re
.compile(r
"^\[{}@.+\]#".format(self
.username
), re
.MULTILINE
)
441 if self
.back_at_prompt_pattern
.search(data
.decode()):
446 def log_console_line(self
, line
):
447 self
.log
.debug("Get in function log_console_line()")
448 sys
.stdout
.write(line
)
451 def in_waiting(self
):
452 in_waiting_before
= 0
455 while in_waiting_before
!= self
.con
.in_waiting
:
456 in_waiting_before
= self
.con
.in_waiting
459 return self
.con
.in_waiting
462 def readline2(self
, pattern
=None):
466 pattern
= re
.compile(pattern
)
469 char
= self
.con
.read(1)
470 string
= string
+ char
.decode("utf-8")
471 string2
= string2
+ char
473 print(char
.decode("utf-8"), end
="")
476 if pattern
and pattern
.match(string
):
479 return {"string" : string
, "return-code" : 1}
485 return {"return-code" : 0}
487 def check_logged_in(self
, username
):
488 pattern
= "^\[" + username
+ "@.+\]#"
489 data
= self
.readline(pattern
=pattern
)
490 if data
["return-code"] == 1:
491 print("We are logged in")
494 print("We are not logged in")
497 def login(self
, password
):
498 if self
.username
== None:
499 self
.log
.error("Username cannot be blank")
502 # Hit enter to see what we get
503 self
.con
.write(b
'\n')
504 # We get two new lines \r\n ?
505 data
= self
.readline()
506 self
.log_console_line(data
.decode())
509 if self
.back_at_prompt():
510 self
.log
.debug("We are already logged in.")
513 # Read all line till we get login:
516 if not data
.decode() == "l":
517 self
.log
.debug("We get no l at the start")
518 self
.log_console_line(self
.readline().decode())
520 # We need to use self.in_waiting because with self.con.in_waiting we get
521 # not the complete string
522 size
= len(self
.buffer) + self
.in_waiting
523 data
= self
.peek(size
)
525 pattern
= r
"^.*login: "
526 pattern
= re
.compile(pattern
)
528 if pattern
.search(data
.decode()):
531 self
.log
.debug("The pattern does not match")
532 self
.log_console_line(self
.readline().decode())
535 string
= "{}\n".format(self
.username
)
536 self
.con
.write(string
.encode())
538 # read the login out of the buffer
539 data
= self
.readline()
540 self
.log
.debug("This is the login:{}".format(data
))
541 self
.log_console_line(data
.decode())
543 # We need to wait her till we get the full string "Password:"
544 #This is useless but self.in_waiting will wait the correct amount of time
545 size
= self
.in_waiting
547 string
= "{}\n".format(password
)
548 self
.con
.write(string
.encode())
550 # Print the 'Password:' line
551 data
= self
.readline()
552 self
.log_console_line(data
.decode())
554 while not self
.back_at_prompt():
555 # This will fail if the login failed so we need to look for the failed keyword
556 data
= self
.readline()
557 self
.log_console_line(data
.decode())
561 def write(self
, string
):
562 self
.log
.debug(string
)
563 self
.con
.write(string
.encode())
566 def command(self
, command
):
567 self
.write("{}\n".format(command
))
569 # We need to read out the prompt for this command first
570 # If we do not do this we will break the loop immediately
571 # because the prompt for this command is still in the buffer
572 data
= self
.readline()
573 self
.log_console_line(data
.decode())
575 while not self
.back_at_prompt():
576 data
= self
.readline()
577 self
.log_console_line(data
.decode())
582 # line = self.readline()
596 # if self.back_at_prompt():
599 # def back_at_prompt():
602 # if not char == "[":
605 # data = self.peek(in_waiting)
606 # m = re.search(..., data)
614 # pattern = r"^\[root@.+\]#"
616 # pattern = re.compile(pattern, re.MULTILINE)
618 # data = """cd / && ls
619 # bin dev home lib64 media opt root sbin sys usr
620 # boot etc lib lost+found mnt proc run srv tmp var
621 # [root@localhost /]# """
623 # #data = "[root@localhost /]# "
624 # data2 = pattern.search(data)
626 # #if pattern.search(data):
627 # # print("get here")
629 # #print(data2.group())
633 # vm = vm("qemu:///system")
635 # dom = vm.domain_start_from_xml_file("/home/jonatan/python-testing-kvm/centos2.xml")
637 # # This block till the vm is booted
638 # print("Waiting till the domain is booted up")
639 # vm.check_domain_is_booted_up(dom)
640 # print("Domain is booted up")
641 # #vm.domain_get_serial_device(dom)
642 # #vm.domain_get_serial_device(dom)
644 # #centos2 = vm("/home/jonatan/python-testing-kvm/centos2.xml", "/home/jonatan/python-testing-kvm/centos2-snapshot.xml")
647 # centos2.create_snapshot()
649 # #centos2.check_is_booted_up()
650 # centos2.login("root", "25814@root")
651 # centos2.cmd("echo 1 > test3")
654 # #versuch1 = connection(centos2.get_serial_device, username="root")
656 # #versuch1.command("cd / && ls")
658 # input("Press Enter to continue...")
660 # centos2.revert_snapshot()
663 # A class which define and undefine a virtual network based on an xml file
665 def __init__(self
, path
):
667 self
.log
.debug("Path is: {}".format(path
))
669 # Should read the test, check if the syntax are valid
670 # and return tuples with the ( host, command ) structure
672 def __init__(self
, path
):
674 self
.recipe_file
= path
675 if not os
.path
.isfile(self
.recipe_file
):
676 self
.log
.error("No such file: {}".format(self
.recipe_file
))
679 with
open(self
.recipe_file
) as fobj
:
680 self
.raw_recipe
= fobj
.readlines()
681 except FileNotFoundError
as error
:
682 self
.log
.error("No such file: {}".format(vm_xml_file
))
684 for line
in self
.raw_recipe
:
690 def __init__(self
, path
):
693 self
.path
= os
.path
.abspath(path
)
694 except BaseException
as e
:
695 self
.log
.error("Could not get absolute path")
697 self
.log
.debug(self
.path
)
699 self
.settings_file
= "{}/settings".format(self
.path
)
700 if not os
.path
.isfile(self
.settings_file
):
701 self
.log
.error("No such file: {}".format(self
.settings_file
))
703 self
.recipe_file
= "{}/recipe".format(self
.path
)
704 if not os
.path
.isfile(self
.recipe_file
):
705 self
.log
.error("No such file: {}".format(self
.recipe_file
))
707 def read_settings(self
):
708 self
.config
= configparser
.ConfigParser()
709 self
.config
.read(self
.settings_file
)
710 self
.name
= self
.config
["DEFAULT"]["Name"]
711 self
.description
= self
.config
["DEFAULT"]["Description"]
713 self
.virtual_environ_name
= self
.config
["VIRTUAL_ENVIRONMENT"]["Name"]
714 self
.virtual_environ_path
= self
.config
["VIRTUAL_ENVIRONMENT"]["Path"]
715 self
.virtual_environ_path
= os
.path
.normpath(self
.path
+ "/" + self
.virtual_environ_path
)
717 def virtual_environ_setup(self
):
718 self
.virtual_environ
= virtual_environ(self
.virtual_environ_path
)
720 self
.virtual_networks
= self
.virtual_environ
.get_networks()
722 self
.virtual_machines
= self
.virtual_environ
.get_machines()
724 def virtual_environ_start(self
):
727 def load_recipe(self
):
733 def virtual_environ_stop():
739 # Should return all vms and networks in a list
740 # and should provide the path to the necessary xml files
741 class virtual_environ():
742 def __init__(self
, path
):
745 self
.path
= os
.path
.abspath(path
)
746 except BaseException
as e
:
747 self
.log
.error("Could not get absolute path")
749 self
.log
.debug(self
.path
)
751 self
.settings_file
= "{}/settings".format(self
.path
)
752 if not os
.path
.isfile(self
.settings_file
):
753 self
.log
.error("No such file: {}".format(self
.settings_file
))
755 self
.log
.debug(self
.settings_file
)
756 self
.config
= configparser
.ConfigParser()
757 self
.config
.read(self
.settings_file
)
758 self
.name
= self
.config
["DEFAULT"]["name"]
759 self
.machines_string
= self
.config
["DEFAULT"]["machines"]
760 self
.networks_string
= self
.config
["DEFAULT"]["networks"]
763 for machine
in self
.machines_string
.split(","):
764 self
.machines
.append(machine
.strip())
767 for network
in self
.networks_string
.split(","):
768 self
.networks
.append(network
.strip())
770 self
.log
.debug(self
.machines
)
771 self
.log
.debug(self
.networks
)
773 def get_networks(self
):
775 for _network
in self
.networks
:
776 self
.log
.debug(_network
)
777 networks
.setdefault(_network
, network(os
.path
.normpath(self
.path
+ "/" + self
.config
[_network
]["xml_file"])))
780 def get_machines(self
):
782 for _machine
in self
.machines
:
783 self
.log
.debug(_machine
)
784 machines
.setdefault(_machine
, vm(
785 os
.path
.normpath(self
.path
+ "/" + self
.config
[_machine
]["xml_file"]),
786 os
.path
.normpath(self
.path
+ "/" + self
.config
[_machine
]["snapshot_xml_file"])))
794 # def command(self, command):
795 # self._send_command(command)
798 # line = self.readline()
812 # if self.back_at_prompt():
815 # def back_at_prompt():
818 # if not char == "[":
821 # data = self.peek(in_waiting)
822 # m = re.search(..., data)
833 # def read(self, size=1):
834 # if len(buffer) >= size:
835 # # throw away first size bytes in buffer
836 # data, buffer = buffer[:size], buffer[size:]
839 # return self.serial.read(size)
841 # def peek(self, size=1):
842 # if len(buffer) <= size:
843 # buffer += self.serial.read(size=size - len(buffer))
845 # return buffer[:size]
848 # def readline(self):
849 # buffer = buffer + self.serial.read(in_wating)
851 # return alle zeichen bis zum \n
853 # return buffer + self.serial.readline()
856 if __name__
== "__main__":
859 parser
= argparse
.ArgumentParser()
861 parser
.add_argument("-d", "--directory", dest
="dir")
863 args
= parser
.parse_args()
865 _recipe
= recipe("/home/jonatan/python-testing-kvm/test/recipe")
866 currenttest
= test(args
.dir)
867 currenttest
.read_settings()
868 currenttest
.virtual_environ_setup()