]> git.ipfire.org Git - nitsi.git/blame - src/nitsi/serial_connection.py
Fix in back_at_prompt check
[nitsi.git] / src / nitsi / serial_connection.py
CommitLineData
f0f16021 1#!/usr/bin/python3
f0f16021 2
6632e137 3import logging
1ed8ca9f 4import os
6632e137
JS
5import re
6import serial
f0f16021 7import sys
6632e137 8import time
1ed8ca9f 9
b560f31a 10from . import logger
b4936764 11
b560f31a 12log = logging.getLogger("nitsi.serial")
f0f16021 13
d9f6c37f 14class SerialConnection():
fc35cba1 15 def __init__(self, device, username=None, log_file=None, name=None, log_start_time=None, longest_machine_name=10):
f0f16021
JS
16 self.buffer = b""
17 self.back_at_prompt_pattern = None
18 self.username = username
b4936764
JS
19 self.name = name
20 self.log_file = log_file
b560f31a 21 self.log = log.getChild(name)
5045ec58 22 self.log.setLevel(logging.INFO)
f0f16021
JS
23 self.con = serial.Serial(device)
24
b4936764 25 self.log_output = self.log.getChild("output")
383a38cd
JS
26 # Do not propagate the output to ancestor loggers as it looks ugly
27 self.log_output.propagate = False
28 # Logging handler for file
b4936764
JS
29 log_file_handler = logging.FileHandler(self.log_file)
30 log_file_handler.setLevel(logging.INFO)
31 log_file_handler.terminator = ""
383a38cd
JS
32 # Loggin Handler for Stream
33 stream_handler = logging.StreamHandler()
34 stream_handler.setLevel(logging.INFO)
35 stream_handler.terminator = ""
b560f31a 36 formatter = logger.TestFormatter(name=self.name,
fc35cba1
JS
37 start_time=log_start_time,
38 longest_machine_name=longest_machine_name)
b4936764 39 log_file_handler.setFormatter(formatter)
383a38cd 40 stream_handler.setFormatter(formatter)
b4936764 41 self.log_output.addHandler(log_file_handler)
383a38cd 42 self.log_output.addHandler(stream_handler)
b4936764 43
f0f16021
JS
44 def read(self, size=1):
45 if len(self.buffer) >= size:
46 # throw away first size bytes in buffer
47 data = self.buffer[:size]
48 # Set the buffer to the non used bytes
49 self.buffer = self.buffer[size:]
50 return data
51 else:
52 data = self.buffer
53 # Set the size to the value we have to read now
54 size = size - len(self.buffer)
55 # Set the buffer empty
56 self.buffer = b""
57 return data + self.con.read(size)
58
59 def peek(self, size=1):
60 if len(self.buffer) <= size:
61 self.buffer += self.con.read(size=size - len(self.buffer))
62
63 return self.buffer[:size]
64
65 def readline(self):
66 self.log.debug(self.buffer)
67 self.buffer = self.buffer + self.con.read(self.con.in_waiting)
68 if b"\n" in self.buffer:
69 size = self.buffer.index(b"\n") + 1
70 self.log.debug("We have a whole line in the buffer")
71 self.log.debug(self.buffer)
72 self.log.debug("We split at {}".format(size))
73 data = self.buffer[:size]
74 self.buffer = self.buffer[size:]
75 self.log.debug(data)
76 self.log.debug(self.buffer)
77 return data
78
79 data = self.buffer
80 self.buffer = b""
81 return data + self.con.readline()
82
83 def back_at_prompt(self):
667f53f9 84 self.log.debug("Check if we are back at prompt")
f0f16021 85 data = self.peek()
667f53f9 86 self.log.debug("First char in buffer is: '{}'".format(data.decode()))
f0f16021
JS
87 if not data == b"[":
88 return False
89
90 # We need to use self.in_waiting because with self.con.in_waiting we get
91 # not the complete string
92 size = len(self.buffer) + self.in_waiting
93 data = self.peek(size)
667f53f9 94 self.log.debug("Data is: '{}'".format(data))
f0f16021 95
667f53f9
JS
96 # When we have an \n in the buffer we are not at the prompt,
97 # instead we still have a line in the buffer
98 if self.line_in_buffer():
99 return False
f0f16021
JS
100
101 if self.back_at_prompt_pattern == None:
102 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
103 self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
104
105 if self.back_at_prompt_pattern.search(data.decode()):
106 return True
107 else:
108 return False
109
110 def log_console_line(self, line):
111 self.log.debug("Get in function log_console_line()")
b4936764 112 self.log_output.info(line)
383a38cd 113 #sys.stdout.write(line)
f0f16021
JS
114
115 @property
116 def in_waiting(self):
117 in_waiting_before = 0
b560f31a 118 time.sleep(0.5)
f0f16021
JS
119
120 while in_waiting_before != self.con.in_waiting:
121 in_waiting_before = self.con.in_waiting
b560f31a 122 time.sleep(0.5)
f0f16021
JS
123
124 return self.con.in_waiting
125
126 def line_in_buffer(self):
127 if b"\n" in self.buffer:
128 return True
129
130 return False
131
132 def print_lines_in_buffer(self):
133 while True:
134 self.log.debug("Fill buffer ...")
135 self.peek(len(self.buffer) + self.in_waiting)
136 self.log.debug("Current buffer length: {}".format(len(self.buffer)))
137 if self.line_in_buffer() == True:
138 while self.line_in_buffer() == True:
139 data = self.readline()
140 self.log_console_line(data.decode())
141 else:
142 self.log.debug("We have printed all lines in the buffer")
143 break
144
145 def login(self, password):
146 if self.username == None:
147 self.log.error("Username cannot be blank")
148 return False
149
150 self.print_lines_in_buffer()
151
152 # Hit enter to see what we get
153 self.con.write(b'\n')
154 # We get two new lines \r\n ?
155 data = self.readline()
156 self.log_console_line(data.decode())
157
158 self.print_lines_in_buffer()
159
160 if self.back_at_prompt():
161 self.log.debug("We are already logged in.")
162 return True
163
164 # Read all line till we get login:
165 while 1:
166 # We need to use self.in_waiting because with self.con.in_waiting we get
167 # not the complete string
168 size = len(self.buffer) + self.in_waiting
169 data = self.peek(size)
170
171 pattern = r"^.*login: "
172 pattern = re.compile(pattern)
173
174 if pattern.search(data.decode()):
175 break
176 else:
177 self.log.debug("The pattern does not match")
178 self.log.debug(self.peek(len(self.buffer) + self.in_waiting))
179 self.log_console_line(self.readline().decode())
180
181 # We can login
182 string = "{}\n".format(self.username)
183 self.con.write(string.encode())
184 self.con.flush()
185 # read the login out of the buffer
186 data = self.readline()
187 self.log.debug("This is the login:{}".format(data))
188 self.log_console_line(data.decode())
189
190 # We need to wait her till we get the full string "Password:"
191 #This is useless but self.in_waiting will wait the correct amount of time
192 size = self.in_waiting
193
194 string = "{}\n".format(password)
195 self.con.write(string.encode())
196 self.con.flush()
197 # Print the 'Password:' line
198 data = self.readline()
199 self.log_console_line(data.decode())
200
201 while not self.back_at_prompt():
202 # This will fail if the login failed so we need to look for the failed keyword
203 data = self.readline()
204 self.log_console_line(data.decode())
205
206 return True
207
208 def write(self, string):
209 self.log.debug(string)
210 self.con.write(string.encode())
211 self.con.flush()
212
213 def command(self, command):
214 self.write("{}; echo \"END: $?\"\n".format(command))
215
216 # We need to read out the prompt for this command first
217 # If we do not do this we will break the loop immediately
218 # because the prompt for this command is still in the buffer
219 data = self.readline()
220 self.log_console_line(data.decode())
221
222 while not self.back_at_prompt():
223 data = self.readline()
224 self.log_console_line(data.decode())
225
667f53f9
JS
226 self.log.debug("We get a prompt so we should be in the last line")
227
f0f16021
JS
228 # We saved our exit code in data (the last line)
229 self.log.debug(data.decode())
230 data = data.decode().replace("END: ", "")
231 self.log.debug(data)
232 self.log.debug(data.strip())
6632e137 233 return data.strip()