Move the serial_connection class into a seperated file
authorJonatan Schlag <jonatan.schlag@ipfire.org>
Fri, 27 Apr 2018 14:13:33 +0000 (16:13 +0200)
committerJonatan Schlag <jonatan.schlag@ipfire.org>
Fri, 27 Apr 2018 14:13:33 +0000 (16:13 +0200)
Signed-off-by: Jonatan Schlag <jonatan.schlag@ipfire.org>
serial_connection.py [new file with mode: 0644]
test.py

diff --git a/serial_connection.py b/serial_connection.py
new file mode 100644 (file)
index 0000000..4dc524e
--- /dev/null
@@ -0,0 +1,196 @@
+#!/usr/bin/python3
+import serial
+
+import re
+
+from time import sleep
+import sys
+
+class serial_connection():
+    def __init__(self, device, username=None):
+        self.buffer = b""
+        self.back_at_prompt_pattern =  None
+        self.username = username
+        self.log = log(1)
+        self.con = serial.Serial(device)
+
+    def read(self, size=1):
+        if len(self.buffer) >= size:
+            # throw away first size bytes in buffer
+            data =  self.buffer[:size]
+            # Set the buffer to the non used bytes
+            self.buffer = self.buffer[size:]
+            return data
+        else:
+            data = self.buffer
+            # Set the size to the value we have to read now
+            size = size - len(self.buffer)
+            # Set the buffer empty
+            self.buffer = b""
+            return data + self.con.read(size)
+
+    def peek(self, size=1):
+        if len(self.buffer) <= size:
+            self.buffer += self.con.read(size=size - len(self.buffer))
+
+        return self.buffer[:size]
+
+    def readline(self):
+        self.log.debug(self.buffer)
+        self.buffer = self.buffer + self.con.read(self.con.in_waiting)
+        if b"\n" in self.buffer:
+            size = self.buffer.index(b"\n") + 1
+            self.log.debug("We have a whole line in the buffer")
+            self.log.debug(self.buffer)
+            self.log.debug("We split at {}".format(size))
+            data = self.buffer[:size]
+            self.buffer = self.buffer[size:]
+            self.log.debug(data)
+            self.log.debug(self.buffer)
+            return data
+
+        data = self.buffer
+        self.buffer = b""
+        return data + self.con.readline()
+
+    def back_at_prompt(self):
+        data = self.peek()
+        if not data == b"[":
+            return False
+
+        # We need to use self.in_waiting because with self.con.in_waiting we get
+        # not the complete string
+        size = len(self.buffer) + self.in_waiting
+        data = self.peek(size)
+
+
+        if self.back_at_prompt_pattern == None:
+            #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
+            self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
+
+        if self.back_at_prompt_pattern.search(data.decode()):
+            return True
+        else:
+            return False
+
+    def log_console_line(self, line):
+        self.log.debug("Get in function log_console_line()")
+        sys.stdout.write(line)
+
+    @property
+    def in_waiting(self):
+        in_waiting_before = 0
+        sleep(0.5)
+
+        while in_waiting_before != self.con.in_waiting:
+            in_waiting_before = self.con.in_waiting
+            sleep(0.5)
+
+        return self.con.in_waiting
+
+    def line_in_buffer(self):
+        if b"\n" in self.buffer:
+            return True
+
+        return False
+
+    def print_lines_in_buffer(self):
+        while True:
+            self.log.debug("Fill buffer ...")
+            self.peek(len(self.buffer) + self.in_waiting)
+            self.log.debug("Current buffer length: {}".format(len(self.buffer)))
+            if self.line_in_buffer() == True:
+                while self.line_in_buffer() == True:
+                    data = self.readline()
+                    self.log_console_line(data.decode())
+            else:
+                self.log.debug("We have printed all lines in the buffer")
+                break
+
+    def login(self, password):
+        if self.username == None:
+            self.log.error("Username cannot be blank")
+            return False
+
+        self.print_lines_in_buffer()
+
+        # Hit enter to see what we get
+        self.con.write(b'\n')
+        # We get two new lines \r\n ?
+        data = self.readline()
+        self.log_console_line(data.decode())
+
+        self.print_lines_in_buffer()
+
+        if self.back_at_prompt():
+            self.log.debug("We are already logged in.")
+            return True
+
+        # Read all line till we get login:
+        while 1:
+            # We need to use self.in_waiting because with self.con.in_waiting we get
+            # not the complete string
+            size = len(self.buffer) + self.in_waiting
+            data = self.peek(size)
+
+            pattern = r"^.*login: "
+            pattern = re.compile(pattern)
+
+            if pattern.search(data.decode()):
+                break
+            else:
+                self.log.debug("The pattern does not match")
+                self.log.debug(self.peek(len(self.buffer) + self.in_waiting))
+                self.log_console_line(self.readline().decode())
+
+        # We can login
+        string = "{}\n".format(self.username)
+        self.con.write(string.encode())
+        self.con.flush()
+        # read the login out of the buffer
+        data = self.readline()
+        self.log.debug("This is the login:{}".format(data))
+        self.log_console_line(data.decode())
+
+        # We need to wait her till we get the full string "Password:"
+        #This is useless but self.in_waiting will wait the correct amount of time
+        size = self.in_waiting
+
+        string = "{}\n".format(password)
+        self.con.write(string.encode())
+        self.con.flush()
+        # Print the 'Password:' line
+        data = self.readline()
+        self.log_console_line(data.decode())
+
+        while not self.back_at_prompt():
+            # This will fail if the login failed so we need to look for the failed keyword
+            data = self.readline()
+            self.log_console_line(data.decode())
+
+        return True
+
+    def write(self, string):
+        self.log.debug(string)
+        self.con.write(string.encode())
+        self.con.flush()
+
+    def command(self, command):
+        self.write("{}; echo \"END: $?\"\n".format(command))
+
+        # We need to read out the prompt for this command first
+        # If we do not do this we will break the loop immediately
+        # because the prompt for this command is still in the buffer
+        data = self.readline()
+        self.log_console_line(data.decode())
+
+        while not self.back_at_prompt():
+            data = self.readline()
+            self.log_console_line(data.decode())
+
+        # We saved our exit code in data (the last line)
+        self.log.debug(data.decode())
+        data = data.decode().replace("END: ", "")
+        self.log.debug(data)
+        self.log.debug(data.strip())
+        return data.strip()
\ No newline at end of file
diff --git a/test.py b/test.py
index c13917d..b0385d3 100755 (executable)
--- a/test.py
+++ b/test.py
@@ -1,10 +1,5 @@
 #!/usr/bin/python3
 
-import serial
-
-import re
-from time import sleep
-import sys
 
 import libvirt
 
@@ -163,193 +158,6 @@ class machine():
             self.disk.umount("/")
             self.disk.close()
 
-class serial_connection():
-    def __init__(self, device, username=None):
-        self.buffer = b""
-        self.back_at_prompt_pattern =  None
-        self.username = username
-        self.log = log(1)
-        self.con = serial.Serial(device)
-
-    def read(self, size=1):
-        if len(self.buffer) >= size:
-            # throw away first size bytes in buffer
-            data =  self.buffer[:size]
-            # Set the buffer to the non used bytes
-            self.buffer = self.buffer[size:]
-            return data
-        else:
-            data = self.buffer
-            # Set the size to the value we have to read now
-            size = size - len(self.buffer)
-            # Set the buffer empty
-            self.buffer = b""
-            return data + self.con.read(size)
-
-    def peek(self, size=1):
-        if len(self.buffer) <= size:
-            self.buffer += self.con.read(size=size - len(self.buffer))
-
-        return self.buffer[:size]
-
-    def readline(self):
-        self.log.debug(self.buffer)
-        self.buffer = self.buffer + self.con.read(self.con.in_waiting)
-        if b"\n" in self.buffer:
-            size = self.buffer.index(b"\n") + 1
-            self.log.debug("We have a whole line in the buffer")
-            self.log.debug(self.buffer)
-            self.log.debug("We split at {}".format(size))
-            data = self.buffer[:size]
-            self.buffer = self.buffer[size:]
-            self.log.debug(data)
-            self.log.debug(self.buffer)
-            return data
-
-        data = self.buffer
-        self.buffer = b""
-        return data + self.con.readline()
-
-    def back_at_prompt(self):
-        data = self.peek()
-        if not data == b"[":
-            return False
-
-        # We need to use self.in_waiting because with self.con.in_waiting we get
-        # not the complete string
-        size = len(self.buffer) + self.in_waiting
-        data = self.peek(size)
-
-
-        if self.back_at_prompt_pattern == None:
-            #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
-            self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
-
-        if self.back_at_prompt_pattern.search(data.decode()):
-            return True
-        else:
-            return False
-
-    def log_console_line(self, line):
-        self.log.debug("Get in function log_console_line()")
-        sys.stdout.write(line)
-
-    @property
-    def in_waiting(self):
-        in_waiting_before = 0
-        sleep(0.5)
-
-        while in_waiting_before != self.con.in_waiting:
-            in_waiting_before = self.con.in_waiting
-            sleep(0.5)
-
-        return self.con.in_waiting
-
-    def line_in_buffer(self):
-        if b"\n" in self.buffer:
-            return True
-
-        return False
-
-    def print_lines_in_buffer(self):
-        while True:
-            self.log.debug("Fill buffer ...")
-            self.peek(len(self.buffer) + self.in_waiting)
-            self.log.debug("Current buffer length: {}".format(len(self.buffer)))
-            if self.line_in_buffer() == True:
-                while self.line_in_buffer() == True:
-                    data = self.readline()
-                    self.log_console_line(data.decode())
-            else:
-                self.log.debug("We have printed all lines in the buffer")
-                break
-
-    def login(self, password):
-        if self.username == None:
-            self.log.error("Username cannot be blank")
-            return False
-
-        self.print_lines_in_buffer()
-
-        # Hit enter to see what we get
-        self.con.write(b'\n')
-        # We get two new lines \r\n ?
-        data = self.readline()
-        self.log_console_line(data.decode())
-
-        self.print_lines_in_buffer()
-
-        if self.back_at_prompt():
-            self.log.debug("We are already logged in.")
-            return True
-
-        # Read all line till we get login:
-        while 1:
-            # We need to use self.in_waiting because with self.con.in_waiting we get
-            # not the complete string
-            size = len(self.buffer) + self.in_waiting
-            data = self.peek(size)
-
-            pattern = r"^.*login: "
-            pattern = re.compile(pattern)
-
-            if pattern.search(data.decode()):
-                break
-            else:
-                self.log.debug("The pattern does not match")
-                self.log_console_line(self.readline().decode())
-
-        # We can login
-        string = "{}\n".format(self.username)
-        self.con.write(string.encode())
-        self.con.flush()
-        # read the login out of the buffer
-        data = self.readline()
-        self.log.debug("This is the login:{}".format(data))
-        self.log_console_line(data.decode())
-
-        # We need to wait her till we get the full string "Password:"
-        #This is useless but self.in_waiting will wait the correct amount of time
-        size = self.in_waiting
-
-        string = "{}\n".format(password)
-        self.con.write(string.encode())
-        self.con.flush()
-        # Print the 'Password:' line
-        data = self.readline()
-        self.log_console_line(data.decode())
-
-        while not self.back_at_prompt():
-            # This will fail if the login failed so we need to look for the failed keyword
-            data = self.readline()
-            self.log_console_line(data.decode())
-
-        return True
-
-    def write(self, string):
-        self.log.debug(string)
-        self.con.write(string.encode())
-        self.con.flush()
-
-    def command(self, command):
-        self.write("{}; echo \"END: $?\"\n".format(command))
-
-        # We need to read out the prompt for this command first
-        # If we do not do this we will break the loop immediately
-        # because the prompt for this command is still in the buffer
-        data = self.readline()
-        self.log_console_line(data.decode())
-
-        while not self.back_at_prompt():
-            data = self.readline()
-            self.log_console_line(data.decode())
-
-        # We saved our exit code in data (the last line)
-        self.log.debug(data.decode())
-        data = data.decode().replace("END: ", "")
-        self.log.debug(data)
-        self.log.debug(data.strip())
-        return data.strip()
 
 
 # A class which define and undefine a virtual network based on an xml file