]> git.ipfire.org Git - nitsi.git/blob - src/nitsi/serial_connection.py
Rework the connection handling of the serial console
[nitsi.git] / src / nitsi / serial_connection.py
1 #!/usr/bin/python3
2
3 import logging
4 import os
5 import re
6 import serial
7 import sys
8 import time
9
10 from . import logger
11
12 log = logging.getLogger("nitsi.serial")
13
14 class SerialConnection():
15 def __init__(self, device, username=None, password=None, log_file=None, name=None, log_start_time=None, longest_machine_name=10):
16 self.buffer = b""
17 self.back_at_prompt_pattern = None
18 self.username = username
19 self.password = password
20 self.name = name
21 self.device = device
22 self.log_file = log_file
23 self.log = log.getChild(name)
24 self.log.setLevel(logging.INFO)
25
26 # We create here a closed serial connection
27 self.con = serial.Serial()
28 # Set the port in a second step to avoid that the connection is brought up automatically
29 self.con.port = self.device
30
31 self.log_output = self.log.getChild("output")
32 # Do not propagate the output to ancestor loggers as it looks ugly
33 self.log_output.propagate = False
34 # Logging handler for file
35 log_file_handler = logging.FileHandler(self.log_file)
36 log_file_handler.setLevel(logging.INFO)
37 log_file_handler.terminator = ""
38 # Loggin Handler for Stream
39 stream_handler = logging.StreamHandler()
40 stream_handler.setLevel(logging.INFO)
41 stream_handler.terminator = ""
42 formatter = logger.TestFormatter(name=self.name,
43 start_time=log_start_time,
44 longest_machine_name=longest_machine_name)
45 log_file_handler.setFormatter(formatter)
46 stream_handler.setFormatter(formatter)
47 self.log_output.addHandler(log_file_handler)
48 self.log_output.addHandler(stream_handler)
49
50 def connect(self):
51 # Check if the serial port is open, if not open the port
52 if self.con.is_open:
53 self.log.debug("Connection to the serial port is open")
54 else:
55 self.log.debug("Connection to the serial port is closed, try to open it.")
56 self.con.open()
57 # Try to login, if we are already logged in we detect this and exit the function
58 self.login()
59
60 def disconnect(self):
61 if not self.con.is_open:
62 self.log.debug("Connection to the serial port is already closed")
63 else:
64 self.log.debug("Connection to the serial port is open, try to close it.")
65 self.con.close()
66
67 def read(self, size=1):
68 if len(self.buffer) >= size:
69 # throw away first size bytes in buffer
70 data = self.buffer[:size]
71 # Set the buffer to the non used bytes
72 self.buffer = self.buffer[size:]
73 return data
74 else:
75 data = self.buffer
76 # Set the size to the value we have to read now
77 size = size - len(self.buffer)
78 # Set the buffer empty
79 self.buffer = b""
80 return data + self.con.read(size)
81
82 def peek(self, size=1):
83 if len(self.buffer) <= size:
84 self.buffer += self.con.read(size=size - len(self.buffer))
85
86 return self.buffer[:size]
87
88 def readline(self):
89 self.log.debug(self.buffer)
90 self.buffer = self.buffer + self.con.read(self.con.in_waiting)
91 if b"\n" in self.buffer:
92 size = self.buffer.index(b"\n") + 1
93 self.log.debug("We have a whole line in the buffer")
94 self.log.debug(self.buffer)
95 self.log.debug("We split at {}".format(size))
96 data = self.buffer[:size]
97 self.buffer = self.buffer[size:]
98 self.log.debug(data)
99 self.log.debug(self.buffer)
100 return data
101
102 data = self.buffer
103 self.buffer = b""
104 return data + self.con.readline()
105
106 def back_at_prompt(self):
107 self.log.debug("Check if we are back at prompt")
108 data = self.peek()
109 self.log.debug("First char in buffer is: '{}'".format(data.decode()))
110 if not data == b"[":
111 return False
112
113 # We need to use self.in_waiting because with self.con.in_waiting we get
114 # not the complete string
115 size = len(self.buffer) + self.in_waiting
116 data = self.peek(size)
117 self.log.debug("Data is: '{}'".format(data))
118
119 # When we have an \n in the buffer we are not at the prompt,
120 # instead we still have a line in the buffer
121 if self.line_in_buffer():
122 return False
123
124 if self.back_at_prompt_pattern == None:
125 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
126 self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
127
128 if self.back_at_prompt_pattern.search(data.decode()):
129 return True
130 else:
131 return False
132
133 def log_console_line(self, line):
134 self.log.debug("Get in function log_console_line()")
135 self.log_output.info(line)
136 #sys.stdout.write(line)
137
138 @property
139 def in_waiting(self):
140 in_waiting_before = 0
141 time.sleep(0.5)
142
143 while in_waiting_before != self.con.in_waiting:
144 in_waiting_before = self.con.in_waiting
145 time.sleep(0.5)
146
147 return self.con.in_waiting
148
149 def line_in_buffer(self):
150 if b"\n" in self.buffer:
151 return True
152
153 return False
154
155 def print_lines_in_buffer(self):
156 while True:
157 self.log.debug("Fill buffer ...")
158 self.peek(len(self.buffer) + self.in_waiting)
159 self.log.debug("Current buffer length: {}".format(len(self.buffer)))
160 if self.line_in_buffer() == True:
161 while self.line_in_buffer() == True:
162 data = self.readline()
163 self.log_console_line(data.decode())
164 else:
165 self.log.debug("We have printed all lines in the buffer")
166 break
167
168 def login(self):
169 if self.username == None:
170 self.log.error("Username cannot be blank")
171 return False
172
173 self.print_lines_in_buffer()
174
175 # Hit enter to see what we get
176 self.con.write(b'\n')
177 # We get two new lines \r\n ?
178 data = self.readline()
179 self.log_console_line(data.decode())
180
181 self.print_lines_in_buffer()
182
183 if self.back_at_prompt():
184 self.log.debug("We are already logged in.")
185 return True
186
187 # Read all line till we get login:
188 while 1:
189 # We need to use self.in_waiting because with self.con.in_waiting we get
190 # not the complete string
191 size = len(self.buffer) + self.in_waiting
192 data = self.peek(size)
193
194 pattern = r"^.*login: "
195 pattern = re.compile(pattern)
196
197 if pattern.search(data.decode()):
198 break
199 else:
200 self.log.debug("The pattern does not match")
201 self.log.debug(self.peek(len(self.buffer) + self.in_waiting))
202 self.log_console_line(self.readline().decode())
203
204 # We can login
205 string = "{}\n".format(self.username)
206 self.con.write(string.encode())
207 self.con.flush()
208 # read the login out of the buffer
209 data = self.readline()
210 self.log.debug("This is the login:{}".format(data))
211 self.log_console_line(data.decode())
212
213 # We need to wait her till we get the full string "Password:"
214 #This is useless but self.in_waiting will wait the correct amount of time
215 size = self.in_waiting
216
217 string = "{}\n".format(self.password)
218 self.con.write(string.encode())
219 self.con.flush()
220 # Print the 'Password:' line
221 data = self.readline()
222 self.log_console_line(data.decode())
223
224 while not self.back_at_prompt():
225 # This will fail if the login failed so we need to look for the failed keyword
226 data = self.readline()
227 self.log_console_line(data.decode())
228
229 return True
230
231 def write(self, string):
232 self.log.debug(string)
233 self.con.write(string.encode())
234 self.con.flush()
235
236 def command(self, command):
237 self.write("{}; echo \"END: $?\"\n".format(command))
238
239 # We need to read out the prompt for this command first
240 # If we do not do this we will break the loop immediately
241 # because the prompt for this command is still in the buffer
242 data = self.readline()
243 self.log_console_line(data.decode())
244
245 while not self.back_at_prompt():
246 data = self.readline()
247 self.log_console_line(data.decode())
248
249 self.log.debug("We get a prompt so we should be in the last line")
250
251 # We saved our exit code in data (the last line)
252 self.log.debug(data.decode())
253 data = data.decode().replace("END: ", "")
254 self.log.debug(data)
255 self.log.debug(data.strip())
256 return data.strip()