Format the name in the log file always with the same length
[nitsi.git] / src / nitsi / serial_connection.py
1 #!/usr/bin/python3
2 import serial
3
4 import re
5 import os
6
7 from time import sleep
8 import sys
9 import logging
10
11 from nitsi.logger import TestFormatter
12
13 logger = logging.getLogger("nitsi.serial")
14
15 class serial_connection():
16     def __init__(self, device, username=None, log_file=None, name=None, log_start_time=None, longest_machine_name=10):
17         self.buffer = b""
18         self.back_at_prompt_pattern =  None
19         self.username = username
20         self.name = name
21         self.log_file = log_file
22         self.log = logger.getChild(name)
23         self.log.setLevel(logging.INFO)
24         self.con = serial.Serial(device)
25
26         self.log_output = self.log.getChild("output")
27         log_file_handler = logging.FileHandler(self.log_file)
28         log_file_handler.setLevel(logging.INFO)
29         log_file_handler.terminator = ""
30         formatter = TestFormatter(name=self.name,
31                                     start_time=log_start_time,
32                                     longest_machine_name=longest_machine_name)
33         log_file_handler.setFormatter(formatter)
34         self.log_output.addHandler(log_file_handler)
35
36     def read(self, size=1):
37         if len(self.buffer) >= size:
38             # throw away first size bytes in buffer
39             data =  self.buffer[:size]
40             # Set the buffer to the non used bytes
41             self.buffer = self.buffer[size:]
42             return data
43         else:
44             data = self.buffer
45             # Set the size to the value we have to read now
46             size = size - len(self.buffer)
47             # Set the buffer empty
48             self.buffer = b""
49             return data + self.con.read(size)
50
51     def peek(self, size=1):
52         if len(self.buffer) <= size:
53             self.buffer += self.con.read(size=size - len(self.buffer))
54
55         return self.buffer[:size]
56
57     def readline(self):
58         self.log.debug(self.buffer)
59         self.buffer = self.buffer + self.con.read(self.con.in_waiting)
60         if b"\n" in self.buffer:
61             size = self.buffer.index(b"\n") + 1
62             self.log.debug("We have a whole line in the buffer")
63             self.log.debug(self.buffer)
64             self.log.debug("We split at {}".format(size))
65             data = self.buffer[:size]
66             self.buffer = self.buffer[size:]
67             self.log.debug(data)
68             self.log.debug(self.buffer)
69             return data
70
71         data = self.buffer
72         self.buffer = b""
73         return data + self.con.readline()
74
75     def back_at_prompt(self):
76         data = self.peek()
77         if not data == b"[":
78             return False
79
80         # We need to use self.in_waiting because with self.con.in_waiting we get
81         # not the complete string
82         size = len(self.buffer) + self.in_waiting
83         data = self.peek(size)
84
85
86         if self.back_at_prompt_pattern == None:
87             #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
88             self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
89
90         if self.back_at_prompt_pattern.search(data.decode()):
91             return True
92         else:
93             return False
94
95     def log_console_line(self, line):
96         self.log.debug("Get in function log_console_line()")
97         self.log_output.info(line)
98         sys.stdout.write(line)
99
100     @property
101     def in_waiting(self):
102         in_waiting_before = 0
103         sleep(0.5)
104
105         while in_waiting_before != self.con.in_waiting:
106             in_waiting_before = self.con.in_waiting
107             sleep(0.5)
108
109         return self.con.in_waiting
110
111     def line_in_buffer(self):
112         if b"\n" in self.buffer:
113             return True
114
115         return False
116
117     def print_lines_in_buffer(self):
118         while True:
119             self.log.debug("Fill buffer ...")
120             self.peek(len(self.buffer) + self.in_waiting)
121             self.log.debug("Current buffer length: {}".format(len(self.buffer)))
122             if self.line_in_buffer() == True:
123                 while self.line_in_buffer() == True:
124                     data = self.readline()
125                     self.log_console_line(data.decode())
126             else:
127                 self.log.debug("We have printed all lines in the buffer")
128                 break
129
130     def login(self, password):
131         if self.username == None:
132             self.log.error("Username cannot be blank")
133             return False
134
135         self.print_lines_in_buffer()
136
137         # Hit enter to see what we get
138         self.con.write(b'\n')
139         # We get two new lines \r\n ?
140         data = self.readline()
141         self.log_console_line(data.decode())
142
143         self.print_lines_in_buffer()
144
145         if self.back_at_prompt():
146             self.log.debug("We are already logged in.")
147             return True
148
149         # Read all line till we get login:
150         while 1:
151             # We need to use self.in_waiting because with self.con.in_waiting we get
152             # not the complete string
153             size = len(self.buffer) + self.in_waiting
154             data = self.peek(size)
155
156             pattern = r"^.*login: "
157             pattern = re.compile(pattern)
158
159             if pattern.search(data.decode()):
160                 break
161             else:
162                 self.log.debug("The pattern does not match")
163                 self.log.debug(self.peek(len(self.buffer) + self.in_waiting))
164                 self.log_console_line(self.readline().decode())
165
166         # We can login
167         string = "{}\n".format(self.username)
168         self.con.write(string.encode())
169         self.con.flush()
170         # read the login out of the buffer
171         data = self.readline()
172         self.log.debug("This is the login:{}".format(data))
173         self.log_console_line(data.decode())
174
175         # We need to wait her till we get the full string "Password:"
176         #This is useless but self.in_waiting will wait the correct amount of time
177         size = self.in_waiting
178
179         string = "{}\n".format(password)
180         self.con.write(string.encode())
181         self.con.flush()
182         # Print the 'Password:' line
183         data = self.readline()
184         self.log_console_line(data.decode())
185
186         while not self.back_at_prompt():
187             # This will fail if the login failed so we need to look for the failed keyword
188             data = self.readline()
189             self.log_console_line(data.decode())
190
191         return True
192
193     def write(self, string):
194         self.log.debug(string)
195         self.con.write(string.encode())
196         self.con.flush()
197
198     def command(self, command):
199         self.write("{}; echo \"END: $?\"\n".format(command))
200
201         # We need to read out the prompt for this command first
202         # If we do not do this we will break the loop immediately
203         # because the prompt for this command is still in the buffer
204         data = self.readline()
205         self.log_console_line(data.decode())
206
207         while not self.back_at_prompt():
208             data = self.readline()
209             self.log_console_line(data.decode())
210
211         # We saved our exit code in data (the last line)
212         self.log.debug(data.decode())
213         data = data.decode().replace("END: ", "")
214         self.log.debug(data)
215         self.log.debug(data.strip())
216         return data.strip()