From: Jonatan Schlag Date: Fri, 27 Apr 2018 14:13:33 +0000 (+0200) Subject: Move the serial_connection class into a seperated file X-Git-Url: http://git.ipfire.org/?p=nitsi.git;a=commitdiff_plain;h=f0f16021f58f162fc2266686fddad3c0ef18995a Move the serial_connection class into a seperated file Signed-off-by: Jonatan Schlag --- diff --git a/serial_connection.py b/serial_connection.py new file mode 100644 index 0000000..4dc524e --- /dev/null +++ b/serial_connection.py @@ -0,0 +1,196 @@ +#!/usr/bin/python3 +import serial + +import re + +from time import sleep +import sys + +class serial_connection(): + def __init__(self, device, username=None): + self.buffer = b"" + self.back_at_prompt_pattern = None + self.username = username + self.log = log(1) + self.con = serial.Serial(device) + + def read(self, size=1): + if len(self.buffer) >= size: + # throw away first size bytes in buffer + data = self.buffer[:size] + # Set the buffer to the non used bytes + self.buffer = self.buffer[size:] + return data + else: + data = self.buffer + # Set the size to the value we have to read now + size = size - len(self.buffer) + # Set the buffer empty + self.buffer = b"" + return data + self.con.read(size) + + def peek(self, size=1): + if len(self.buffer) <= size: + self.buffer += self.con.read(size=size - len(self.buffer)) + + return self.buffer[:size] + + def readline(self): + self.log.debug(self.buffer) + self.buffer = self.buffer + self.con.read(self.con.in_waiting) + if b"\n" in self.buffer: + size = self.buffer.index(b"\n") + 1 + self.log.debug("We have a whole line in the buffer") + self.log.debug(self.buffer) + self.log.debug("We split at {}".format(size)) + data = self.buffer[:size] + self.buffer = self.buffer[size:] + self.log.debug(data) + self.log.debug(self.buffer) + return data + + data = self.buffer + self.buffer = b"" + return data + self.con.readline() + + def back_at_prompt(self): + data = self.peek() + if not data == b"[": + return False + + # We need to use self.in_waiting because with self.con.in_waiting we get + # not the complete string + size = len(self.buffer) + self.in_waiting + data = self.peek(size) + + + if self.back_at_prompt_pattern == None: + #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username) + self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE) + + if self.back_at_prompt_pattern.search(data.decode()): + return True + else: + return False + + def log_console_line(self, line): + self.log.debug("Get in function log_console_line()") + sys.stdout.write(line) + + @property + def in_waiting(self): + in_waiting_before = 0 + sleep(0.5) + + while in_waiting_before != self.con.in_waiting: + in_waiting_before = self.con.in_waiting + sleep(0.5) + + return self.con.in_waiting + + def line_in_buffer(self): + if b"\n" in self.buffer: + return True + + return False + + def print_lines_in_buffer(self): + while True: + self.log.debug("Fill buffer ...") + self.peek(len(self.buffer) + self.in_waiting) + self.log.debug("Current buffer length: {}".format(len(self.buffer))) + if self.line_in_buffer() == True: + while self.line_in_buffer() == True: + data = self.readline() + self.log_console_line(data.decode()) + else: + self.log.debug("We have printed all lines in the buffer") + break + + def login(self, password): + if self.username == None: + self.log.error("Username cannot be blank") + return False + + self.print_lines_in_buffer() + + # Hit enter to see what we get + self.con.write(b'\n') + # We get two new lines \r\n ? + data = self.readline() + self.log_console_line(data.decode()) + + self.print_lines_in_buffer() + + if self.back_at_prompt(): + self.log.debug("We are already logged in.") + return True + + # Read all line till we get login: + while 1: + # We need to use self.in_waiting because with self.con.in_waiting we get + # not the complete string + size = len(self.buffer) + self.in_waiting + data = self.peek(size) + + pattern = r"^.*login: " + pattern = re.compile(pattern) + + if pattern.search(data.decode()): + break + else: + self.log.debug("The pattern does not match") + self.log.debug(self.peek(len(self.buffer) + self.in_waiting)) + self.log_console_line(self.readline().decode()) + + # We can login + string = "{}\n".format(self.username) + self.con.write(string.encode()) + self.con.flush() + # read the login out of the buffer + data = self.readline() + self.log.debug("This is the login:{}".format(data)) + self.log_console_line(data.decode()) + + # We need to wait her till we get the full string "Password:" + #This is useless but self.in_waiting will wait the correct amount of time + size = self.in_waiting + + string = "{}\n".format(password) + self.con.write(string.encode()) + self.con.flush() + # Print the 'Password:' line + data = self.readline() + self.log_console_line(data.decode()) + + while not self.back_at_prompt(): + # This will fail if the login failed so we need to look for the failed keyword + data = self.readline() + self.log_console_line(data.decode()) + + return True + + def write(self, string): + self.log.debug(string) + self.con.write(string.encode()) + self.con.flush() + + def command(self, command): + self.write("{}; echo \"END: $?\"\n".format(command)) + + # We need to read out the prompt for this command first + # If we do not do this we will break the loop immediately + # because the prompt for this command is still in the buffer + data = self.readline() + self.log_console_line(data.decode()) + + while not self.back_at_prompt(): + data = self.readline() + self.log_console_line(data.decode()) + + # We saved our exit code in data (the last line) + self.log.debug(data.decode()) + data = data.decode().replace("END: ", "") + self.log.debug(data) + self.log.debug(data.strip()) + return data.strip() \ No newline at end of file diff --git a/test.py b/test.py index c13917d..b0385d3 100755 --- a/test.py +++ b/test.py @@ -1,10 +1,5 @@ #!/usr/bin/python3 -import serial - -import re -from time import sleep -import sys import libvirt @@ -163,193 +158,6 @@ class machine(): self.disk.umount("/") self.disk.close() -class serial_connection(): - def __init__(self, device, username=None): - self.buffer = b"" - self.back_at_prompt_pattern = None - self.username = username - self.log = log(1) - self.con = serial.Serial(device) - - def read(self, size=1): - if len(self.buffer) >= size: - # throw away first size bytes in buffer - data = self.buffer[:size] - # Set the buffer to the non used bytes - self.buffer = self.buffer[size:] - return data - else: - data = self.buffer - # Set the size to the value we have to read now - size = size - len(self.buffer) - # Set the buffer empty - self.buffer = b"" - return data + self.con.read(size) - - def peek(self, size=1): - if len(self.buffer) <= size: - self.buffer += self.con.read(size=size - len(self.buffer)) - - return self.buffer[:size] - - def readline(self): - self.log.debug(self.buffer) - self.buffer = self.buffer + self.con.read(self.con.in_waiting) - if b"\n" in self.buffer: - size = self.buffer.index(b"\n") + 1 - self.log.debug("We have a whole line in the buffer") - self.log.debug(self.buffer) - self.log.debug("We split at {}".format(size)) - data = self.buffer[:size] - self.buffer = self.buffer[size:] - self.log.debug(data) - self.log.debug(self.buffer) - return data - - data = self.buffer - self.buffer = b"" - return data + self.con.readline() - - def back_at_prompt(self): - data = self.peek() - if not data == b"[": - return False - - # We need to use self.in_waiting because with self.con.in_waiting we get - # not the complete string - size = len(self.buffer) + self.in_waiting - data = self.peek(size) - - - if self.back_at_prompt_pattern == None: - #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username) - self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE) - - if self.back_at_prompt_pattern.search(data.decode()): - return True - else: - return False - - def log_console_line(self, line): - self.log.debug("Get in function log_console_line()") - sys.stdout.write(line) - - @property - def in_waiting(self): - in_waiting_before = 0 - sleep(0.5) - - while in_waiting_before != self.con.in_waiting: - in_waiting_before = self.con.in_waiting - sleep(0.5) - - return self.con.in_waiting - - def line_in_buffer(self): - if b"\n" in self.buffer: - return True - - return False - - def print_lines_in_buffer(self): - while True: - self.log.debug("Fill buffer ...") - self.peek(len(self.buffer) + self.in_waiting) - self.log.debug("Current buffer length: {}".format(len(self.buffer))) - if self.line_in_buffer() == True: - while self.line_in_buffer() == True: - data = self.readline() - self.log_console_line(data.decode()) - else: - self.log.debug("We have printed all lines in the buffer") - break - - def login(self, password): - if self.username == None: - self.log.error("Username cannot be blank") - return False - - self.print_lines_in_buffer() - - # Hit enter to see what we get - self.con.write(b'\n') - # We get two new lines \r\n ? - data = self.readline() - self.log_console_line(data.decode()) - - self.print_lines_in_buffer() - - if self.back_at_prompt(): - self.log.debug("We are already logged in.") - return True - - # Read all line till we get login: - while 1: - # We need to use self.in_waiting because with self.con.in_waiting we get - # not the complete string - size = len(self.buffer) + self.in_waiting - data = self.peek(size) - - pattern = r"^.*login: " - pattern = re.compile(pattern) - - if pattern.search(data.decode()): - break - else: - self.log.debug("The pattern does not match") - self.log_console_line(self.readline().decode()) - - # We can login - string = "{}\n".format(self.username) - self.con.write(string.encode()) - self.con.flush() - # read the login out of the buffer - data = self.readline() - self.log.debug("This is the login:{}".format(data)) - self.log_console_line(data.decode()) - - # We need to wait her till we get the full string "Password:" - #This is useless but self.in_waiting will wait the correct amount of time - size = self.in_waiting - - string = "{}\n".format(password) - self.con.write(string.encode()) - self.con.flush() - # Print the 'Password:' line - data = self.readline() - self.log_console_line(data.decode()) - - while not self.back_at_prompt(): - # This will fail if the login failed so we need to look for the failed keyword - data = self.readline() - self.log_console_line(data.decode()) - - return True - - def write(self, string): - self.log.debug(string) - self.con.write(string.encode()) - self.con.flush() - - def command(self, command): - self.write("{}; echo \"END: $?\"\n".format(command)) - - # We need to read out the prompt for this command first - # If we do not do this we will break the loop immediately - # because the prompt for this command is still in the buffer - data = self.readline() - self.log_console_line(data.decode()) - - while not self.back_at_prompt(): - data = self.readline() - self.log_console_line(data.decode()) - - # We saved our exit code in data (the last line) - self.log.debug(data.decode()) - data = data.decode().replace("END: ", "") - self.log.debug(data) - self.log.debug(data.strip()) - return data.strip() # A class which define and undefine a virtual network based on an xml file