]>
git.ipfire.org Git - nitsi.git/blob - test.py
11 import xml
.etree
.ElementTree
as ET
18 def __init__(self
, log_level
):
19 self
.log_level
= log_level
21 def debug(self
, string
):
22 if self
.log_level
>= 4:
23 print("DEBUG: {}".format(string
))
25 def error(self
, string
):
26 print("ERROR: {}".format(string
))
29 def __init__(self
, uri
):
32 self
.connection
= None
34 def get_domain_from_name(self
, name
):
35 dom
= self
.con
.lookupByName(name
)
43 if self
.connection
== None:
45 self
.connection
= libvirt
.open(self
.uri
)
46 except BaseException
as error
:
47 self
.log
.error("Could not connect to: {}".format(self
.uri
))
49 self
.log
.debug("Connected to: {}".format(self
.uri
))
50 return self
.connection
52 return self
.connection
56 def __init__(self
, vm_xml_file
, snapshot_xml_file
, image
, root_uid
, username
, password
):
58 self
.con
= libvirt_con("qemu:///system")
60 with
open(vm_xml_file
) as fobj
:
61 self
.vm_xml
= fobj
.read()
62 except FileNotFoundError
as error
:
63 self
.log
.error("No such file: {}".format(vm_xml_file
))
66 with
open(snapshot_xml_file
) as fobj
:
67 self
.snapshot_xml
= fobj
.read()
68 except FileNotFoundError
as error
:
69 self
.log
.error("No such file: {}".format(snapshot_xml_file
))
73 if not os
.path
.isfile(self
.image
):
74 self
.log
.error("No such file: {}".format(self
.image
))
76 self
.root_uid
= root_uid
78 self
.username
= username
79 self
.password
= password
82 self
.dom
= self
.con
.con
.defineXML(self
.vm_xml
)
84 self
.log
.error("Could not define VM")
88 if self
.dom
.create() < 0:
89 self
.log
.error("Could not start VM")
94 if self
.dom
.shutdown() < 0:
95 self
.log
.error("Could not shutdown VM")
98 self
.log
.error("Domain is not running")
103 def create_snapshot(self
):
105 self
.snapshot
= self
.dom
.snapshotCreateXML(self
.snapshot_xml
)
107 if not self
.snapshot
:
108 self
.log
.error("Could not create snapshot")
111 def revert_snapshot(self
):
112 self
.dom
.revertToSnapshot(self
.snapshot
)
113 self
.snapshot
.delete()
115 def is_running(self
):
117 state
, reason
= self
.dom
.state()
119 if state
== libvirt
.VIR_DOMAIN_RUNNING
:
124 def get_serial_device(self
):
126 if not self
.is_running():
129 xml_root
= ET
.fromstring(self
.dom
.XMLDesc(0))
131 elem
= xml_root
.find("./devices/serial/source")
132 return elem
.get("path")
134 def check_is_booted_up(self
):
135 serial_con
= connection(self
.get_serial_device())
137 serial_con
.write("\n")
138 # This will block till the domain is booted up
145 self
.serial_con
= connection(self
.get_serial_device(), username
=self
.username
)
146 self
.serial_con
.login(self
.password
)
147 except BaseException
as e
:
148 self
.log
.error("Could not connect to the domain via serial console")
151 return self
.serial_con
.command(cmd
)
155 def __init__(self
, device
, username
=None):
157 self
.back_at_prompt_pattern
= None
158 self
.username
= username
160 self
.con
= serial
.Serial(device
)
162 def read(self
, size
=1):
163 if len(self
.buffer) >= size
:
164 # throw away first size bytes in buffer
165 data
= self
.buffer[:size
]
166 # Set the buffer to the non used bytes
167 self
.buffer = self
.buffer[size
:]
171 # Set the size to the value we have to read now
172 size
= size
- len(self
.buffer)
173 # Set the buffer empty
175 return data
+ self
.con
.read(size
)
177 def peek(self
, size
=1):
178 if len(self
.buffer) <= size
:
179 self
.buffer += self
.con
.read(size
=size
- len(self
.buffer))
181 return self
.buffer[:size
]
184 self
.log
.debug(self
.buffer)
185 self
.buffer = self
.buffer + self
.con
.read(self
.con
.in_waiting
)
186 if b
"\n" in self
.buffer:
187 size
= self
.buffer.index(b
"\n") + 1
188 self
.log
.debug("We have a whole line in the buffer")
189 self
.log
.debug(self
.buffer)
190 self
.log
.debug("We split at {}".format(size
))
191 data
= self
.buffer[:size
]
192 self
.buffer = self
.buffer[size
:]
194 self
.log
.debug(self
.buffer)
199 return data
+ self
.con
.readline()
201 def back_at_prompt(self
):
206 # We need to use self.in_waiting because with self.con.in_waiting we get
207 # not the complete string
208 size
= len(self
.buffer) + self
.in_waiting
209 data
= self
.peek(size
)
212 if self
.back_at_prompt_pattern
== None:
213 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
214 self
.back_at_prompt_pattern
= re
.compile(r
"^\[{}@.+\]#".format(self
.username
), re
.MULTILINE
)
216 if self
.back_at_prompt_pattern
.search(data
.decode()):
221 def log_console_line(self
, line
):
222 self
.log
.debug("Get in function log_console_line()")
223 sys
.stdout
.write(line
)
226 def in_waiting(self
):
227 in_waiting_before
= 0
230 while in_waiting_before
!= self
.con
.in_waiting
:
231 in_waiting_before
= self
.con
.in_waiting
234 return self
.con
.in_waiting
237 def readline2(self
, pattern
=None):
241 pattern
= re
.compile(pattern
)
244 char
= self
.con
.read(1)
245 string
= string
+ char
.decode("utf-8")
246 string2
= string2
+ char
248 print(char
.decode("utf-8"), end
="")
251 if pattern
and pattern
.match(string
):
254 return {"string" : string
, "return-code" : 1}
260 return {"return-code" : 0}
262 def check_logged_in(self
, username
):
263 pattern
= "^\[" + username
+ "@.+\]#"
264 data
= self
.readline(pattern
=pattern
)
265 if data
["return-code"] == 1:
266 print("We are logged in")
269 print("We are not logged in")
272 def login(self
, password
):
273 if self
.username
== None:
274 self
.log
.error("Username cannot be blank")
277 # Hit enter to see what we get
278 self
.con
.write(b
'\n')
279 # We get two new lines \r\n ?
280 data
= self
.readline()
281 self
.log_console_line(data
.decode())
284 if self
.back_at_prompt():
285 self
.log
.debug("We are already logged in.")
288 # Read all line till we get login:
291 if not data
.decode() == "l":
292 self
.log
.debug("We get no l at the start")
293 self
.log_console_line(self
.readline().decode())
295 # We need to use self.in_waiting because with self.con.in_waiting we get
296 # not the complete string
297 size
= len(self
.buffer) + self
.in_waiting
298 data
= self
.peek(size
)
300 pattern
= r
"^.*login: "
301 pattern
= re
.compile(pattern
)
303 if pattern
.search(data
.decode()):
306 self
.log
.debug("The pattern does not match")
307 self
.log_console_line(self
.readline().decode())
310 string
= "{}\n".format(self
.username
)
311 self
.con
.write(string
.encode())
313 # read the login out of the buffer
314 data
= self
.readline()
315 self
.log
.debug("This is the login:{}".format(data
))
316 self
.log_console_line(data
.decode())
318 # We need to wait her till we get the full string "Password:"
319 #This is useless but self.in_waiting will wait the correct amount of time
320 size
= self
.in_waiting
322 string
= "{}\n".format(password
)
323 self
.con
.write(string
.encode())
325 # Print the 'Password:' line
326 data
= self
.readline()
327 self
.log_console_line(data
.decode())
329 while not self
.back_at_prompt():
330 # This will fail if the login failed so we need to look for the failed keyword
331 data
= self
.readline()
332 self
.log_console_line(data
.decode())
336 def write(self
, string
):
337 self
.log
.debug(string
)
338 self
.con
.write(string
.encode())
341 def command(self
, command
):
342 self
.write("{}\n".format(command
))
344 # We need to read out the prompt for this command first
345 # If we do not do this we will break the loop immediately
346 # because the prompt for this command is still in the buffer
347 data
= self
.readline()
348 self
.log_console_line(data
.decode())
350 while not self
.back_at_prompt():
351 data
= self
.readline()
352 self
.log_console_line(data
.decode())
355 # A class which define and undefine a virtual network based on an xml file
357 def __init__(self
, network_xml_file
):
359 self
.con
= libvirt_con("qemu:///system")
361 with
open(network_xml_file
) as fobj
:
362 self
.network_xml
= fobj
.read()
363 except FileNotFoundError
as error
:
364 self
.log
.error("No such file: {}".format(vm_xml_file
))
367 self
.network
= self
.con
.con
.networkDefineXML(self
.network_xml
)
370 self
.log
.error("Failed to define virtual network")
373 self
.network
.create()
376 self
.network
.destroy()
380 class RecipeExeption(Exception):
385 # Should read the test, check if the syntax are valid
386 # and return tuples with the ( host, command ) structure
388 def __init__(self
, path
):
390 self
.recipe_file
= path
393 if not os
.path
.isfile(self
.recipe_file
):
394 self
.log
.error("No such file: {}".format(self
.recipe_file
))
397 with
open(self
.recipe_file
) as fobj
:
398 self
.raw_recipe
= fobj
.readlines()
399 except FileNotFoundError
as error
:
400 self
.log
.error("No such file: {}".format(vm_xml_file
))
412 for line
in self
.raw_recipe
:
413 raw_line
= line
.split(":")
414 if len(raw_line
) < 2:
415 self
.log
.error("Error parsing the recipe in line {}".format(i
))
418 raw_line
= raw_line
[0].strip().split(" ")
419 if len(raw_line
) == 0:
420 self
.log
.error("Failed to parse the recipe in line {}".format(i
))
422 elif len(raw_line
) == 1:
423 if raw_line
[0] == "":
424 self
.log
.error("Failed to parse the recipe in line {}".format(i
))
426 machine
= raw_line
[0]
428 elif len(raw_line
) == 2:
429 machine
= raw_line
[0]
432 self
._recipe
.append((machine
.strip(), extra
.strip(), cmd
.strip()))
437 def __init__(self
, path
):
440 self
.path
= os
.path
.abspath(path
)
441 except BaseException
as e
:
442 self
.log
.error("Could not get absolute path")
444 self
.log
.debug(self
.path
)
446 self
.settings_file
= "{}/settings".format(self
.path
)
447 if not os
.path
.isfile(self
.settings_file
):
448 self
.log
.error("No such file: {}".format(self
.settings_file
))
450 self
.recipe_file
= "{}/recipe".format(self
.path
)
451 if not os
.path
.isfile(self
.recipe_file
):
452 self
.log
.error("No such file: {}".format(self
.recipe_file
))
454 def read_settings(self
):
455 self
.config
= configparser
.ConfigParser()
456 self
.config
.read(self
.settings_file
)
457 self
.name
= self
.config
["DEFAULT"]["Name"]
458 self
.description
= self
.config
["DEFAULT"]["Description"]
460 self
.virtual_environ_name
= self
.config
["VIRTUAL_ENVIRONMENT"]["Name"]
461 self
.virtual_environ_path
= self
.config
["VIRTUAL_ENVIRONMENT"]["Path"]
462 self
.virtual_environ_path
= os
.path
.normpath(self
.path
+ "/" + self
.virtual_environ_path
)
464 def virtual_environ_setup(self
):
465 self
.virtual_environ
= virtual_environ(self
.virtual_environ_path
)
467 self
.virtual_networks
= self
.virtual_environ
.get_networks()
469 self
.virtual_machines
= self
.virtual_environ
.get_machines()
471 def virtual_environ_start(self
):
474 def load_recipe(self
):
480 def virtual_environ_stop():
484 # Should return all vms and networks in a list
485 # and should provide the path to the necessary xml files
486 class virtual_environ():
487 def __init__(self
, path
):
490 self
.path
= os
.path
.abspath(path
)
491 except BaseException
as e
:
492 self
.log
.error("Could not get absolute path")
494 self
.log
.debug(self
.path
)
496 self
.settings_file
= "{}/settings".format(self
.path
)
497 if not os
.path
.isfile(self
.settings_file
):
498 self
.log
.error("No such file: {}".format(self
.settings_file
))
500 self
.log
.debug(self
.settings_file
)
501 self
.config
= configparser
.ConfigParser()
502 self
.config
.read(self
.settings_file
)
503 self
.name
= self
.config
["DEFAULT"]["name"]
504 self
.machines_string
= self
.config
["DEFAULT"]["machines"]
505 self
.networks_string
= self
.config
["DEFAULT"]["networks"]
508 for machine
in self
.machines_string
.split(","):
509 self
.machines
.append(machine
.strip())
512 for network
in self
.networks_string
.split(","):
513 self
.networks
.append(network
.strip())
515 self
.log
.debug(self
.machines
)
516 self
.log
.debug(self
.networks
)
518 def get_networks(self
):
520 for _network
in self
.networks
:
521 self
.log
.debug(_network
)
522 networks
.setdefault(_network
, network(os
.path
.normpath(self
.path
+ "/" + self
.config
[_network
]["xml_file"])))
525 def get_machines(self
):
527 for _machine
in self
.machines
:
528 self
.log
.debug(_machine
)
529 machines
.setdefault(_machine
, vm(
530 os
.path
.normpath(self
.path
+ "/" + self
.config
[_machine
]["xml_file"]),
531 os
.path
.normpath(self
.path
+ "/" + self
.config
[_machine
]["snapshot_xml_file"])))
536 if __name__
== "__main__":
539 parser
= argparse
.ArgumentParser()
541 parser
.add_argument("-d", "--directory", dest
="dir")
543 args
= parser
.parse_args()
545 _recipe
= recipe("/home/jonatan/python-testing-kvm/test/recipe")
546 currenttest
= test(args
.dir)
547 currenttest
.read_settings()
548 currenttest
.virtual_environ_setup()