]> git.ipfire.org Git - nitsi.git/blob - src/nitsi/serial_connection.py
3e65225b6ff7218c3715b57790b28a03dc92f72c
[nitsi.git] / src / nitsi / serial_connection.py
1 #!/usr/bin/python3
2
3 import logging
4 import os
5 import re
6 import serial
7 import sys
8 import time
9
10 from . import logger
11
12 log = logging.getLogger("nitsi.serial")
13
14 class serial_connection():
15 def __init__(self, device, username=None, log_file=None, name=None, log_start_time=None, longest_machine_name=10):
16 self.buffer = b""
17 self.back_at_prompt_pattern = None
18 self.username = username
19 self.name = name
20 self.log_file = log_file
21 self.log = log.getChild(name)
22 self.log.setLevel(logging.INFO)
23 self.con = serial.Serial(device)
24
25 self.log_output = self.log.getChild("output")
26 # Do not propagate the output to ancestor loggers as it looks ugly
27 self.log_output.propagate = False
28 # Logging handler for file
29 log_file_handler = logging.FileHandler(self.log_file)
30 log_file_handler.setLevel(logging.INFO)
31 log_file_handler.terminator = ""
32 # Loggin Handler for Stream
33 stream_handler = logging.StreamHandler()
34 stream_handler.setLevel(logging.INFO)
35 stream_handler.terminator = ""
36 formatter = logger.TestFormatter(name=self.name,
37 start_time=log_start_time,
38 longest_machine_name=longest_machine_name)
39 log_file_handler.setFormatter(formatter)
40 stream_handler.setFormatter(formatter)
41 self.log_output.addHandler(log_file_handler)
42 self.log_output.addHandler(stream_handler)
43
44 def read(self, size=1):
45 if len(self.buffer) >= size:
46 # throw away first size bytes in buffer
47 data = self.buffer[:size]
48 # Set the buffer to the non used bytes
49 self.buffer = self.buffer[size:]
50 return data
51 else:
52 data = self.buffer
53 # Set the size to the value we have to read now
54 size = size - len(self.buffer)
55 # Set the buffer empty
56 self.buffer = b""
57 return data + self.con.read(size)
58
59 def peek(self, size=1):
60 if len(self.buffer) <= size:
61 self.buffer += self.con.read(size=size - len(self.buffer))
62
63 return self.buffer[:size]
64
65 def readline(self):
66 self.log.debug(self.buffer)
67 self.buffer = self.buffer + self.con.read(self.con.in_waiting)
68 if b"\n" in self.buffer:
69 size = self.buffer.index(b"\n") + 1
70 self.log.debug("We have a whole line in the buffer")
71 self.log.debug(self.buffer)
72 self.log.debug("We split at {}".format(size))
73 data = self.buffer[:size]
74 self.buffer = self.buffer[size:]
75 self.log.debug(data)
76 self.log.debug(self.buffer)
77 return data
78
79 data = self.buffer
80 self.buffer = b""
81 return data + self.con.readline()
82
83 def back_at_prompt(self):
84 data = self.peek()
85 if not data == b"[":
86 return False
87
88 # We need to use self.in_waiting because with self.con.in_waiting we get
89 # not the complete string
90 size = len(self.buffer) + self.in_waiting
91 data = self.peek(size)
92
93
94 if self.back_at_prompt_pattern == None:
95 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
96 self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
97
98 if self.back_at_prompt_pattern.search(data.decode()):
99 return True
100 else:
101 return False
102
103 def log_console_line(self, line):
104 self.log.debug("Get in function log_console_line()")
105 self.log_output.info(line)
106 #sys.stdout.write(line)
107
108 @property
109 def in_waiting(self):
110 in_waiting_before = 0
111 time.sleep(0.5)
112
113 while in_waiting_before != self.con.in_waiting:
114 in_waiting_before = self.con.in_waiting
115 time.sleep(0.5)
116
117 return self.con.in_waiting
118
119 def line_in_buffer(self):
120 if b"\n" in self.buffer:
121 return True
122
123 return False
124
125 def print_lines_in_buffer(self):
126 while True:
127 self.log.debug("Fill buffer ...")
128 self.peek(len(self.buffer) + self.in_waiting)
129 self.log.debug("Current buffer length: {}".format(len(self.buffer)))
130 if self.line_in_buffer() == True:
131 while self.line_in_buffer() == True:
132 data = self.readline()
133 self.log_console_line(data.decode())
134 else:
135 self.log.debug("We have printed all lines in the buffer")
136 break
137
138 def login(self, password):
139 if self.username == None:
140 self.log.error("Username cannot be blank")
141 return False
142
143 self.print_lines_in_buffer()
144
145 # Hit enter to see what we get
146 self.con.write(b'\n')
147 # We get two new lines \r\n ?
148 data = self.readline()
149 self.log_console_line(data.decode())
150
151 self.print_lines_in_buffer()
152
153 if self.back_at_prompt():
154 self.log.debug("We are already logged in.")
155 return True
156
157 # Read all line till we get login:
158 while 1:
159 # We need to use self.in_waiting because with self.con.in_waiting we get
160 # not the complete string
161 size = len(self.buffer) + self.in_waiting
162 data = self.peek(size)
163
164 pattern = r"^.*login: "
165 pattern = re.compile(pattern)
166
167 if pattern.search(data.decode()):
168 break
169 else:
170 self.log.debug("The pattern does not match")
171 self.log.debug(self.peek(len(self.buffer) + self.in_waiting))
172 self.log_console_line(self.readline().decode())
173
174 # We can login
175 string = "{}\n".format(self.username)
176 self.con.write(string.encode())
177 self.con.flush()
178 # read the login out of the buffer
179 data = self.readline()
180 self.log.debug("This is the login:{}".format(data))
181 self.log_console_line(data.decode())
182
183 # We need to wait her till we get the full string "Password:"
184 #This is useless but self.in_waiting will wait the correct amount of time
185 size = self.in_waiting
186
187 string = "{}\n".format(password)
188 self.con.write(string.encode())
189 self.con.flush()
190 # Print the 'Password:' line
191 data = self.readline()
192 self.log_console_line(data.decode())
193
194 while not self.back_at_prompt():
195 # This will fail if the login failed so we need to look for the failed keyword
196 data = self.readline()
197 self.log_console_line(data.decode())
198
199 return True
200
201 def write(self, string):
202 self.log.debug(string)
203 self.con.write(string.encode())
204 self.con.flush()
205
206 def command(self, command):
207 self.write("{}; echo \"END: $?\"\n".format(command))
208
209 # We need to read out the prompt for this command first
210 # If we do not do this we will break the loop immediately
211 # because the prompt for this command is still in the buffer
212 data = self.readline()
213 self.log_console_line(data.decode())
214
215 while not self.back_at_prompt():
216 data = self.readline()
217 self.log_console_line(data.decode())
218
219 # We saved our exit code in data (the last line)
220 self.log.debug(data.decode())
221 data = data.decode().replace("END: ", "")
222 self.log.debug(data)
223 self.log.debug(data.strip())
224 return data.strip()