Initial checkin
[nitsi.git] / test2.py
1 #!/usr/bin/python3
2
3 import serial
4
5 import re
6 from time import sleep
7 import sys
8
9 import libvirt
10
11 import xml.etree.ElementTree as ET
12
13 class test():
14 def __init__(self, name):
15 pass
16
17 def read_settings(self):
18 pass
19
20 class log():
21 def __init__(self, log_level):
22 self.log_level = log_level
23
24 def debug(self, string):
25 if self.log_level >= 4:
26 print("DEBUG: {}".format(string))
27
28 def error(self, string):
29 print("ERROR: {}".format(string))
30
31 class vm():
32 def __init__(self, uri):
33 self.log = log(4)
34 try:
35 self.con = libvirt.open(uri)
36 except BaseException as error:
37 self.log.error("Could not connect to: {}".format(uri))
38
39 self.log.debug("Connected to: {}".format(uri))
40
41 def get_domain_from_name(self, name):
42 dom = self.con.lookupByName(name)
43
44 if dom == None:
45 raise BaseException
46
47 return dom
48
49 def domain_is_running(self, dom):
50 if dom == None:
51 return False
52
53 state, reason = dom.state()
54
55 if state == libvirt.VIR_DOMAIN_RUNNING:
56 return True
57 else:
58 return False
59
60 def domain_get_serial_device(self, dom):
61 if dom == None:
62 raise BaseException
63
64 if not self.domain_is_running(dom):
65 raise BaseException
66
67 xml_root = ET.fromstring(dom.XMLDesc(0))
68
69 elem = xml_root.find("./devices/serial/source")
70 return elem.get("path")
71
72 def domain_start_from_xml(self, xml):
73 dom = self.con.createXML(xml, 0)
74 if dom == None:
75 self.log.error("Failed to create the VM")
76 raise BaseException
77
78 return dom
79
80 def domain_start_from_xml_file(self, file):
81 try:
82 fobj = open(file)
83 except FileNotFoundError as error:
84 self.log.error("No such file: {}".format(file))
85
86 return self.domain_start_from_xml(fobj.read())
87
88 def check_domain_is_booted_up(self, dom):
89 serial_con = connection(self.domain_get_serial_device(dom))
90
91 serial_con.write("\n")
92 # This will block till the domain is booted up
93 serial_con.read(1)
94
95
96
97
98
99 # try:
100 # dom = conn.lookupByUUIDString(uuid)
101 # except:
102 # flash(u"Failed to get the domain object", 'alert-danger')
103 # print('Failed to get the domain object', file=sys.stderr)
104 # conn.close()
105 # return redirect("/vm")
106
107 # domname = dom.name()
108 # if action == "start":
109 # try:
110 # dom.create()
111 # except:
112 # flash(u"Can not boot guest domain.", 'alert-danger')
113 # print('Can not boot guest domain.', file=sys.stderr)
114 # conn.close()
115 # return redirect("/vm")
116
117 # flash(u"Sucessfully started Domain \"{}\"".format(domname), 'alert-info')
118 # conn.close()
119 # return redirect("/vm")
120
121 # elif action == "shutdown":
122 # try:
123 # dom.shutdown()
124 # except:
125 # flash(u"Can not shutdown guest domain.", 'alert-danger')
126 # print('Can not shutdown guest domain.', file=sys.stderr)
127 # conn.close()
128 # return redirect("/vm")
129
130 # flash(u"Sucessfully shutdowned Domain \"{}\"".format(domname), 'alert-info')
131 # conn.close()
132 # return redirect("/vm")
133
134 # elif action == "destroy":
135 # try:
136 # dom.destroy()
137 # except:
138 # flash(u"Can not destroy guest domain.", 'alert-danger')
139 # print('Can not destroy guest domain.', file=sys.stderr)
140 # conn.close()
141 # return redirect("/vm")
142
143 # flash(u"Sucessfully destroyed Domain \"{}\"".format(domname), 'alert-info')
144 # conn.close()
145 # return redirect("/vm")
146
147 # elif action == "pause":
148 # try:
149 # dom.suspend()
150 # except:
151 # flash(u"Can not pause guest domain.", 'alert-danger')
152 # print('Can not pause guest domain.', file=sys.stderr)
153 # conn.close()
154 # return redirect("/vm")
155
156 # flash(u"Sucessfully paused Domain \"{}\"".format(domname), 'alert-info')
157 # conn.close()
158 # return redirect("/vm")
159
160 # elif action == "resume":
161 # try:
162 # dom.resume()
163 # except:
164 # flash(u"Can not eesume guest domain.:", 'alert-danger')
165 # print('Can not resume guest domain.', file=sys.stderr)
166 # conn.close()
167 # return redirect("/vm")
168
169 # flash(u"Sucessfully resumed Domain \"{}\"".format(domname), 'alert-info')
170 # conn.close()
171 # return redirect("/vm")
172
173 # else:
174 # flash(u"No such action: \"{}\"".format(action), 'alert-warning')
175 # conn.close()
176 # return redirect("/vm")
177
178
179 # @vms.route('/vm')
180 # @login_required
181 # def vm_overview():
182 # import sys
183 # import libvirt
184 # conn = libvirt.open('qemu:///system')
185 # doms = conn.listAllDomains(0)
186 # domains = []
187
188 # if len(doms) != 0:
189 # for dom in doms:
190 # domain = {}
191 # domain.setdefault("name", dom.name())
192 # domain.setdefault("uuid", dom.UUIDString())
193 # state, reason = dom.state()
194 # domain.setdefault("state", dom_state(state))
195 # domains.append(domain)
196
197 # conn.close()
198
199 # return render_template("virtberry_vm_basic-vm.html", domains=domains)
200
201
202
203 class connection():
204 def __init__(self, device, username=None):
205 self.buffer = b""
206 self.back_at_prompt_pattern = None
207 self.username = username
208 self.log = log(1)
209 self.con = serial.Serial(device)
210 # # Just press enter one time to see what we get
211 # self.con.write(b'\n')
212 # # We get two new lines \r\n ?
213 # data = self.readline()
214 # self.log_console_line(data.decode())
215
216
217 # if not self.back_at_prompt():
218 # self.log.debug("We need to login")
219 # if not self.login(password):
220 # self.log.error("Login failed")
221 # return False
222 # else:
223 # self.log.debug("We are logged in")
224
225
226 ''' in_waiting_before = 0
227 sleep(1)
228
229 while in_waiting_before != self.con.in_waiting:
230 in_waiting_before = self.con.in_waiting
231 sleep(0.5)
232
233 print(self.con.in_waiting)
234 data = self.con.read(self.con.in_waiting)
235 print(data)
236 print(data.decode(),end='')
237
238 string = 'root\n'
239 self.con.write(string.encode())
240 self.con.flush() '''
241
242 ''' in_waiting_before = 0
243 sleep(1)
244
245 while in_waiting_before != self.con.in_waiting:
246 in_waiting_before = self.con.in_waiting
247 sleep(0.5)
248
249 print(self.con.in_waiting)
250 data = self.con.read(self.con.in_waiting)
251 print(data)
252 print(data.decode(), end='')
253
254 string = '25814@root\n'
255 self.con.write(string.encode())
256 self.con.flush()
257
258 in_waiting_before = 0
259 sleep(1)
260
261 while in_waiting_before != self.con.in_waiting:
262 in_waiting_before = self.con.in_waiting
263 sleep(0.5)
264
265 print(self.con.in_waiting)
266 data = self.con.read(self.con.in_waiting)
267 print(data)
268 print(data.decode(), end='') '''
269
270 # check if we already logged in
271 # If we we get something like [root@localhost ~]#
272 #self.readline()
273
274 # if not self.check_logged_in(username):
275 #print("Try to login")
276 #if self.login(username, password):
277 # print("Could not login")
278 # return False
279
280 #pattern = "^\[" + username + "@.+\]#"
281 #print(pattern)
282 #data = self.readline(pattern=pattern)
283 #if data["return-code"] == 1:
284 # print("We are logged in")
285 # else:
286 # print("We are not logged in")
287 # login
288
289 #while 1:
290 #data = self.readline("^.*login:")
291 # if data["return-code"] == 1:
292 # break
293
294 # string = 'cd / && ls \n'
295 # self.con.write(string.encode())
296 # self.con.flush()
297 # #print(self.con.read(5))
298
299 # data = self.readline()
300 # self.log_console_line(data.decode())
301
302 # while not self.back_at_prompt():
303 # data = self.readline()
304 # self.log_console_line(data.decode())
305
306 ''' in_waiting_before = 0
307 sleep(1)
308
309 while in_waiting_before != self.con.in_waiting:
310 in_waiting_before = self.con.in_waiting
311 sleep(0.5)
312
313 print(self.con.in_waiting)
314 data = self.con.read(self.con.in_waiting)
315 data = data.decode()
316
317 pattern = "^\[" + username + "@.+\]# $"
318 pattern = re.compile(pattern, re.MULTILINE)
319 if pattern.match(data, re.MULTILINE):
320 print("It works")
321
322 print(data, end='') '''
323
324
325 #@property
326 #def con(self):
327 # return self.con
328
329
330 def read(self, size=1):
331 if len(self.buffer) >= size:
332 # throw away first size bytes in buffer
333 data = self.buffer[:size]
334 # Set the buffer to the non used bytes
335 self.buffer = self.buffer[size:]
336 return data
337 else:
338 data = self.buffer
339 # Set the size to the value we have to read now
340 size = size - len(self.buffer)
341 # Set the buffer empty
342 self.buffer = b""
343 return data + self.con.read(size)
344
345 def peek(self, size=1):
346 if len(self.buffer) <= size:
347 self.buffer += self.con.read(size=size - len(self.buffer))
348
349 return self.buffer[:size]
350
351 def readline(self):
352 self.log.debug(self.buffer)
353 self.buffer = self.buffer + self.con.read(self.con.in_waiting)
354 if b"\n" in self.buffer:
355 size = self.buffer.index(b"\n") + 1
356 self.log.debug("We have a whole line in the buffer")
357 self.log.debug(self.buffer)
358 self.log.debug("We split at {}".format(size))
359 data = self.buffer[:size]
360 self.buffer = self.buffer[size:]
361 self.log.debug(data)
362 self.log.debug(self.buffer)
363 return data
364
365 data = self.buffer
366 self.buffer = b""
367 return data + self.con.readline()
368
369 def back_at_prompt(self):
370 data = self.peek()
371 if not data == b"[":
372 return False
373
374 # We need to use self.in_waiting because with self.con.in_waiting we get
375 # not the complete string
376 size = len(self.buffer) + self.in_waiting
377 data = self.peek(size)
378
379
380 if self.back_at_prompt_pattern == None:
381 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
382 self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
383
384 if self.back_at_prompt_pattern.search(data.decode()):
385 return True
386 else:
387 return False
388
389 def log_console_line(self, line):
390 self.log.debug("Get in function log_console_line()")
391 sys.stdout.write(line)
392
393 @property
394 def in_waiting(self):
395 in_waiting_before = 0
396 sleep(0.5)
397
398 while in_waiting_before != self.con.in_waiting:
399 in_waiting_before = self.con.in_waiting
400 sleep(0.5)
401
402 return self.con.in_waiting
403
404
405 def readline2(self, pattern=None):
406 string = ""
407 string2 = b""
408 if pattern:
409 pattern = re.compile(pattern)
410
411 while 1:
412 char = self.con.read(1)
413 string = string + char.decode("utf-8")
414 string2 = string2 + char
415 #print(char)
416 print(char.decode("utf-8"), end="")
417
418 #print(string2)
419 if pattern and pattern.match(string):
420 #print("get here1")
421 #print(string2)
422 return {"string" : string, "return-code" : 1}
423
424 if char == b"\n":
425 #print(char)
426 #print(string2)
427 #print("get here2")
428 return {"return-code" : 0}
429
430 def check_logged_in(self, username):
431 pattern = "^\[" + username + "@.+\]#"
432 data = self.readline(pattern=pattern)
433 if data["return-code"] == 1:
434 print("We are logged in")
435 return True
436 else:
437 print("We are not logged in")
438 return False
439
440 def login(self, password):
441 if self.username == None:
442 self.log.error("Username cannot be blank")
443 return False
444
445 # Hit enter to see what we get
446 self.con.write(b'\n')
447 # We get two new lines \r\n ?
448 data = self.readline()
449 self.log_console_line(data.decode())
450
451
452 if self.back_at_prompt():
453 self.log.debug("We are already logged in.")
454 return True
455
456 # Read all line till we get login:
457 while 1:
458 data = self.peek()
459 if not data.decode() == "l":
460 self.log.debug("We get no l at the start")
461 self.log_console_line(self.readline().decode())
462
463 # We need to use self.in_waiting because with self.con.in_waiting we get
464 # not the complete string
465 size = len(self.buffer) + self.in_waiting
466 data = self.peek(size)
467
468 pattern = r"^.*login: "
469 pattern = re.compile(pattern)
470
471 if pattern.search(data.decode()):
472 break
473 else:
474 self.log.debug("The pattern does not match")
475 self.log_console_line(self.readline().decode())
476
477 # We can login
478 string = "{}\n".format(self.username)
479 self.con.write(string.encode())
480 self.con.flush()
481 # read the login out of the buffer
482 data = self.readline()
483 self.log.debug("This is the login:{}".format(data))
484 self.log_console_line(data.decode())
485
486 # We need to wait her till we get the full string "Password:"
487 #This is useless but self.in_waiting will wait the correct amount of time
488 size = self.in_waiting
489
490 string = "{}\n".format(password)
491 self.con.write(string.encode())
492 self.con.flush()
493 # Print the 'Password:' line
494 data = self.readline()
495 self.log_console_line(data.decode())
496
497 while not self.back_at_prompt():
498 # This will fail if the login failed so we need to look for the failed keyword
499 data = self.readline()
500 self.log_console_line(data.decode())
501
502 return True
503
504 def write(self, string):
505 self.log.debug(string)
506 self.con.write(string.encode())
507 self.con.flush()
508
509 def command(self, command):
510 self.write("{}\n".format(command))
511
512 # We need to read out the prompt for this command first
513 # If we do not do this we will break the loop immediately
514 # because the prompt for this command is still in the buffer
515 data = self.readline()
516 self.log_console_line(data.decode())
517
518 while not self.back_at_prompt():
519 data = self.readline()
520 self.log_console_line(data.decode())
521
522
523 # while True:
524
525 # line = self.readline()
526
527 # print (line)
528
529 # print("Hello")
530
531 # print("World")
532
533 # Hello
534 # World
535
536 # Hello World
537
538 # # Peek for prompt?
539 # if self.back_at_prompt():
540 # break
541
542 # def back_at_prompt():
543 # data = self.peek()
544
545 # if not char == "[":
546 # return False
547
548 # data = self.peek(in_waiting)
549 # m = re.search(..., data)
550 # if m:
551 # return True
552
553
554
555
556
557 # pattern = r"^\[root@.+\]#"
558
559 # pattern = re.compile(pattern, re.MULTILINE)
560
561 # data = """cd / && ls
562 # bin dev home lib64 media opt root sbin sys usr
563 # boot etc lib lost+found mnt proc run srv tmp var
564 # [root@localhost /]# """
565
566 # #data = "[root@localhost /]# "
567 # data2 = pattern.search(data)
568
569 # #if pattern.search(data):
570 # # print("get here")
571
572 # #print(data2.group())
573
574
575
576 vm = vm("qemu:///system")
577
578 dom = vm.domain_start_from_xml_file("/home/jonatan/python-testing-kvm/centos2.xml")
579
580 # This block till the vm is booted
581 print("Waiting till the domain is booted up")
582 vm.check_domain_is_booted_up(dom)
583 print("Domain is booted up")
584 #vm.domain_get_serial_device(dom)
585 #vm.domain_get_serial_device(dom)
586
587 versuch1 = connection(vm.domain_get_serial_device(dom), username="root")
588 versuch1.login("25814@root")
589 versuch1.command("cd / && ls")
590
591
592 # def command(self, command):
593 # self._send_command(command)
594
595 # while True:
596 # line = self.readline()
597
598 # print (line)
599
600 # print("Hello")
601
602 # print("World")
603
604 # Hello
605 # World
606
607 # Hello World
608
609 # # Peek for prompt?
610 # if self.back_at_prompt():
611 # break
612
613 # def back_at_prompt():
614 # data = self.peek()
615
616 # if not char == "[":
617 # return False
618
619 # data = self.peek(in_waiting)
620 # m = re.search(..., data)
621 # if m:
622 # return True
623
624
625
626
627
628 # class connection()
629 # buffer = b""
630
631 # def read(self, size=1):
632 # if len(buffer) >= size:
633 # # throw away first size bytes in buffer
634 # data, buffer = buffer[:size], buffer[size:]
635 # return data
636
637 # return self.serial.read(size)
638
639 # def peek(self, size=1):
640 # if len(buffer) <= size:
641 # buffer += self.serial.read(size=size - len(buffer))
642
643 # return buffer[:size]
644
645
646 # def readline(self):
647 # buffer = buffer + self.serial.read(in_wating)
648 # if "\n" in buffer:
649 # return alle zeichen bis zum \n
650
651 # return buffer + self.serial.readline()