]> git.ipfire.org Git - nitsi.git/blame_incremental - src/nitsi/serial_connection.py
Format the name in the log file always with the same length
[nitsi.git] / src / nitsi / serial_connection.py
... / ...
CommitLineData
1#!/usr/bin/python3
2import serial
3
4import re
5import os
6
7from time import sleep
8import sys
9import logging
10
11from nitsi.logger import TestFormatter
12
13logger = logging.getLogger("nitsi.serial")
14
15class serial_connection():
16 def __init__(self, device, username=None, log_file=None, name=None, log_start_time=None, longest_machine_name=10):
17 self.buffer = b""
18 self.back_at_prompt_pattern = None
19 self.username = username
20 self.name = name
21 self.log_file = log_file
22 self.log = logger.getChild(name)
23 self.log.setLevel(logging.INFO)
24 self.con = serial.Serial(device)
25
26 self.log_output = self.log.getChild("output")
27 log_file_handler = logging.FileHandler(self.log_file)
28 log_file_handler.setLevel(logging.INFO)
29 log_file_handler.terminator = ""
30 formatter = TestFormatter(name=self.name,
31 start_time=log_start_time,
32 longest_machine_name=longest_machine_name)
33 log_file_handler.setFormatter(formatter)
34 self.log_output.addHandler(log_file_handler)
35
36 def read(self, size=1):
37 if len(self.buffer) >= size:
38 # throw away first size bytes in buffer
39 data = self.buffer[:size]
40 # Set the buffer to the non used bytes
41 self.buffer = self.buffer[size:]
42 return data
43 else:
44 data = self.buffer
45 # Set the size to the value we have to read now
46 size = size - len(self.buffer)
47 # Set the buffer empty
48 self.buffer = b""
49 return data + self.con.read(size)
50
51 def peek(self, size=1):
52 if len(self.buffer) <= size:
53 self.buffer += self.con.read(size=size - len(self.buffer))
54
55 return self.buffer[:size]
56
57 def readline(self):
58 self.log.debug(self.buffer)
59 self.buffer = self.buffer + self.con.read(self.con.in_waiting)
60 if b"\n" in self.buffer:
61 size = self.buffer.index(b"\n") + 1
62 self.log.debug("We have a whole line in the buffer")
63 self.log.debug(self.buffer)
64 self.log.debug("We split at {}".format(size))
65 data = self.buffer[:size]
66 self.buffer = self.buffer[size:]
67 self.log.debug(data)
68 self.log.debug(self.buffer)
69 return data
70
71 data = self.buffer
72 self.buffer = b""
73 return data + self.con.readline()
74
75 def back_at_prompt(self):
76 data = self.peek()
77 if not data == b"[":
78 return False
79
80 # We need to use self.in_waiting because with self.con.in_waiting we get
81 # not the complete string
82 size = len(self.buffer) + self.in_waiting
83 data = self.peek(size)
84
85
86 if self.back_at_prompt_pattern == None:
87 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
88 self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
89
90 if self.back_at_prompt_pattern.search(data.decode()):
91 return True
92 else:
93 return False
94
95 def log_console_line(self, line):
96 self.log.debug("Get in function log_console_line()")
97 self.log_output.info(line)
98 sys.stdout.write(line)
99
100 @property
101 def in_waiting(self):
102 in_waiting_before = 0
103 sleep(0.5)
104
105 while in_waiting_before != self.con.in_waiting:
106 in_waiting_before = self.con.in_waiting
107 sleep(0.5)
108
109 return self.con.in_waiting
110
111 def line_in_buffer(self):
112 if b"\n" in self.buffer:
113 return True
114
115 return False
116
117 def print_lines_in_buffer(self):
118 while True:
119 self.log.debug("Fill buffer ...")
120 self.peek(len(self.buffer) + self.in_waiting)
121 self.log.debug("Current buffer length: {}".format(len(self.buffer)))
122 if self.line_in_buffer() == True:
123 while self.line_in_buffer() == True:
124 data = self.readline()
125 self.log_console_line(data.decode())
126 else:
127 self.log.debug("We have printed all lines in the buffer")
128 break
129
130 def login(self, password):
131 if self.username == None:
132 self.log.error("Username cannot be blank")
133 return False
134
135 self.print_lines_in_buffer()
136
137 # Hit enter to see what we get
138 self.con.write(b'\n')
139 # We get two new lines \r\n ?
140 data = self.readline()
141 self.log_console_line(data.decode())
142
143 self.print_lines_in_buffer()
144
145 if self.back_at_prompt():
146 self.log.debug("We are already logged in.")
147 return True
148
149 # Read all line till we get login:
150 while 1:
151 # We need to use self.in_waiting because with self.con.in_waiting we get
152 # not the complete string
153 size = len(self.buffer) + self.in_waiting
154 data = self.peek(size)
155
156 pattern = r"^.*login: "
157 pattern = re.compile(pattern)
158
159 if pattern.search(data.decode()):
160 break
161 else:
162 self.log.debug("The pattern does not match")
163 self.log.debug(self.peek(len(self.buffer) + self.in_waiting))
164 self.log_console_line(self.readline().decode())
165
166 # We can login
167 string = "{}\n".format(self.username)
168 self.con.write(string.encode())
169 self.con.flush()
170 # read the login out of the buffer
171 data = self.readline()
172 self.log.debug("This is the login:{}".format(data))
173 self.log_console_line(data.decode())
174
175 # We need to wait her till we get the full string "Password:"
176 #This is useless but self.in_waiting will wait the correct amount of time
177 size = self.in_waiting
178
179 string = "{}\n".format(password)
180 self.con.write(string.encode())
181 self.con.flush()
182 # Print the 'Password:' line
183 data = self.readline()
184 self.log_console_line(data.decode())
185
186 while not self.back_at_prompt():
187 # This will fail if the login failed so we need to look for the failed keyword
188 data = self.readline()
189 self.log_console_line(data.decode())
190
191 return True
192
193 def write(self, string):
194 self.log.debug(string)
195 self.con.write(string.encode())
196 self.con.flush()
197
198 def command(self, command):
199 self.write("{}; echo \"END: $?\"\n".format(command))
200
201 # We need to read out the prompt for this command first
202 # If we do not do this we will break the loop immediately
203 # because the prompt for this command is still in the buffer
204 data = self.readline()
205 self.log_console_line(data.decode())
206
207 while not self.back_at_prompt():
208 data = self.readline()
209 self.log_console_line(data.decode())
210
211 # We saved our exit code in data (the last line)
212 self.log.debug(data.decode())
213 data = data.decode().replace("END: ", "")
214 self.log.debug(data)
215 self.log.debug(data.strip())
216 return data.strip()