Use one time to which all serial connection output is relativ
[nitsi.git] / src / nitsi / serial_connection.py
1 #!/usr/bin/python3
2 import serial
3
4 import re
5 import os
6
7 from time import sleep
8 import sys
9 import logging
10
11 from nitsi.logger import TestFormatter
12
13 logger = logging.getLogger("nitsi.serial")
14
15 class serial_connection():
16     def __init__(self, device, username=None, log_file=None, name=None, log_start_time=None):
17         self.buffer = b""
18         self.back_at_prompt_pattern =  None
19         self.username = username
20         self.name = name
21         self.log_file = log_file
22         self.log = logger.getChild(name)
23         self.log.setLevel(logging.INFO)
24         self.con = serial.Serial(device)
25
26         self.log_output = self.log.getChild("output")
27         log_file_handler = logging.FileHandler(self.log_file)
28         log_file_handler.setLevel(logging.INFO)
29         log_file_handler.terminator = ""
30         formatter = TestFormatter(name=self.name, start_time=log_start_time)
31         log_file_handler.setFormatter(formatter)
32         self.log_output.addHandler(log_file_handler)
33
34     def read(self, size=1):
35         if len(self.buffer) >= size:
36             # throw away first size bytes in buffer
37             data =  self.buffer[:size]
38             # Set the buffer to the non used bytes
39             self.buffer = self.buffer[size:]
40             return data
41         else:
42             data = self.buffer
43             # Set the size to the value we have to read now
44             size = size - len(self.buffer)
45             # Set the buffer empty
46             self.buffer = b""
47             return data + self.con.read(size)
48
49     def peek(self, size=1):
50         if len(self.buffer) <= size:
51             self.buffer += self.con.read(size=size - len(self.buffer))
52
53         return self.buffer[:size]
54
55     def readline(self):
56         self.log.debug(self.buffer)
57         self.buffer = self.buffer + self.con.read(self.con.in_waiting)
58         if b"\n" in self.buffer:
59             size = self.buffer.index(b"\n") + 1
60             self.log.debug("We have a whole line in the buffer")
61             self.log.debug(self.buffer)
62             self.log.debug("We split at {}".format(size))
63             data = self.buffer[:size]
64             self.buffer = self.buffer[size:]
65             self.log.debug(data)
66             self.log.debug(self.buffer)
67             return data
68
69         data = self.buffer
70         self.buffer = b""
71         return data + self.con.readline()
72
73     def back_at_prompt(self):
74         data = self.peek()
75         if not data == b"[":
76             return False
77
78         # We need to use self.in_waiting because with self.con.in_waiting we get
79         # not the complete string
80         size = len(self.buffer) + self.in_waiting
81         data = self.peek(size)
82
83
84         if self.back_at_prompt_pattern == None:
85             #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
86             self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
87
88         if self.back_at_prompt_pattern.search(data.decode()):
89             return True
90         else:
91             return False
92
93     def log_console_line(self, line):
94         self.log.debug("Get in function log_console_line()")
95         self.log_output.info(line)
96         sys.stdout.write(line)
97
98     @property
99     def in_waiting(self):
100         in_waiting_before = 0
101         sleep(0.5)
102
103         while in_waiting_before != self.con.in_waiting:
104             in_waiting_before = self.con.in_waiting
105             sleep(0.5)
106
107         return self.con.in_waiting
108
109     def line_in_buffer(self):
110         if b"\n" in self.buffer:
111             return True
112
113         return False
114
115     def print_lines_in_buffer(self):
116         while True:
117             self.log.debug("Fill buffer ...")
118             self.peek(len(self.buffer) + self.in_waiting)
119             self.log.debug("Current buffer length: {}".format(len(self.buffer)))
120             if self.line_in_buffer() == True:
121                 while self.line_in_buffer() == True:
122                     data = self.readline()
123                     self.log_console_line(data.decode())
124             else:
125                 self.log.debug("We have printed all lines in the buffer")
126                 break
127
128     def login(self, password):
129         if self.username == None:
130             self.log.error("Username cannot be blank")
131             return False
132
133         self.print_lines_in_buffer()
134
135         # Hit enter to see what we get
136         self.con.write(b'\n')
137         # We get two new lines \r\n ?
138         data = self.readline()
139         self.log_console_line(data.decode())
140
141         self.print_lines_in_buffer()
142
143         if self.back_at_prompt():
144             self.log.debug("We are already logged in.")
145             return True
146
147         # Read all line till we get login:
148         while 1:
149             # We need to use self.in_waiting because with self.con.in_waiting we get
150             # not the complete string
151             size = len(self.buffer) + self.in_waiting
152             data = self.peek(size)
153
154             pattern = r"^.*login: "
155             pattern = re.compile(pattern)
156
157             if pattern.search(data.decode()):
158                 break
159             else:
160                 self.log.debug("The pattern does not match")
161                 self.log.debug(self.peek(len(self.buffer) + self.in_waiting))
162                 self.log_console_line(self.readline().decode())
163
164         # We can login
165         string = "{}\n".format(self.username)
166         self.con.write(string.encode())
167         self.con.flush()
168         # read the login out of the buffer
169         data = self.readline()
170         self.log.debug("This is the login:{}".format(data))
171         self.log_console_line(data.decode())
172
173         # We need to wait her till we get the full string "Password:"
174         #This is useless but self.in_waiting will wait the correct amount of time
175         size = self.in_waiting
176
177         string = "{}\n".format(password)
178         self.con.write(string.encode())
179         self.con.flush()
180         # Print the 'Password:' line
181         data = self.readline()
182         self.log_console_line(data.decode())
183
184         while not self.back_at_prompt():
185             # This will fail if the login failed so we need to look for the failed keyword
186             data = self.readline()
187             self.log_console_line(data.decode())
188
189         return True
190
191     def write(self, string):
192         self.log.debug(string)
193         self.con.write(string.encode())
194         self.con.flush()
195
196     def command(self, command):
197         self.write("{}; echo \"END: $?\"\n".format(command))
198
199         # We need to read out the prompt for this command first
200         # If we do not do this we will break the loop immediately
201         # because the prompt for this command is still in the buffer
202         data = self.readline()
203         self.log_console_line(data.decode())
204
205         while not self.back_at_prompt():
206             data = self.readline()
207             self.log_console_line(data.decode())
208
209         # We saved our exit code in data (the last line)
210         self.log.debug(data.decode())
211         data = data.decode().replace("END: ", "")
212         self.log.debug(data)
213         self.log.debug(data.strip())
214         return data.strip()