]> git.ipfire.org Git - nitsi.git/blame - src/nitsi/serial_connection.py
Use one time to which all serial connection output is relativ
[nitsi.git] / src / nitsi / serial_connection.py
CommitLineData
f0f16021
JS
1#!/usr/bin/python3
2import serial
3
4import re
1ed8ca9f 5import os
f0f16021
JS
6
7from time import sleep
8import sys
1ed8ca9f
JS
9import logging
10
b4936764
JS
11from nitsi.logger import TestFormatter
12
1ed8ca9f 13logger = logging.getLogger("nitsi.serial")
f0f16021
JS
14
15class serial_connection():
6c352a80 16 def __init__(self, device, username=None, log_file=None, name=None, log_start_time=None):
f0f16021
JS
17 self.buffer = b""
18 self.back_at_prompt_pattern = None
19 self.username = username
b4936764
JS
20 self.name = name
21 self.log_file = log_file
22 self.log = logger.getChild(name)
5045ec58 23 self.log.setLevel(logging.INFO)
f0f16021
JS
24 self.con = serial.Serial(device)
25
b4936764
JS
26 self.log_output = self.log.getChild("output")
27 log_file_handler = logging.FileHandler(self.log_file)
28 log_file_handler.setLevel(logging.INFO)
29 log_file_handler.terminator = ""
6c352a80 30 formatter = TestFormatter(name=self.name, start_time=log_start_time)
b4936764
JS
31 log_file_handler.setFormatter(formatter)
32 self.log_output.addHandler(log_file_handler)
33
f0f16021
JS
34 def read(self, size=1):
35 if len(self.buffer) >= size:
36 # throw away first size bytes in buffer
37 data = self.buffer[:size]
38 # Set the buffer to the non used bytes
39 self.buffer = self.buffer[size:]
40 return data
41 else:
42 data = self.buffer
43 # Set the size to the value we have to read now
44 size = size - len(self.buffer)
45 # Set the buffer empty
46 self.buffer = b""
47 return data + self.con.read(size)
48
49 def peek(self, size=1):
50 if len(self.buffer) <= size:
51 self.buffer += self.con.read(size=size - len(self.buffer))
52
53 return self.buffer[:size]
54
55 def readline(self):
56 self.log.debug(self.buffer)
57 self.buffer = self.buffer + self.con.read(self.con.in_waiting)
58 if b"\n" in self.buffer:
59 size = self.buffer.index(b"\n") + 1
60 self.log.debug("We have a whole line in the buffer")
61 self.log.debug(self.buffer)
62 self.log.debug("We split at {}".format(size))
63 data = self.buffer[:size]
64 self.buffer = self.buffer[size:]
65 self.log.debug(data)
66 self.log.debug(self.buffer)
67 return data
68
69 data = self.buffer
70 self.buffer = b""
71 return data + self.con.readline()
72
73 def back_at_prompt(self):
74 data = self.peek()
75 if not data == b"[":
76 return False
77
78 # We need to use self.in_waiting because with self.con.in_waiting we get
79 # not the complete string
80 size = len(self.buffer) + self.in_waiting
81 data = self.peek(size)
82
83
84 if self.back_at_prompt_pattern == None:
85 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
86 self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
87
88 if self.back_at_prompt_pattern.search(data.decode()):
89 return True
90 else:
91 return False
92
93 def log_console_line(self, line):
94 self.log.debug("Get in function log_console_line()")
b4936764 95 self.log_output.info(line)
f0f16021
JS
96 sys.stdout.write(line)
97
98 @property
99 def in_waiting(self):
100 in_waiting_before = 0
101 sleep(0.5)
102
103 while in_waiting_before != self.con.in_waiting:
104 in_waiting_before = self.con.in_waiting
105 sleep(0.5)
106
107 return self.con.in_waiting
108
109 def line_in_buffer(self):
110 if b"\n" in self.buffer:
111 return True
112
113 return False
114
115 def print_lines_in_buffer(self):
116 while True:
117 self.log.debug("Fill buffer ...")
118 self.peek(len(self.buffer) + self.in_waiting)
119 self.log.debug("Current buffer length: {}".format(len(self.buffer)))
120 if self.line_in_buffer() == True:
121 while self.line_in_buffer() == True:
122 data = self.readline()
123 self.log_console_line(data.decode())
124 else:
125 self.log.debug("We have printed all lines in the buffer")
126 break
127
128 def login(self, password):
129 if self.username == None:
130 self.log.error("Username cannot be blank")
131 return False
132
133 self.print_lines_in_buffer()
134
135 # Hit enter to see what we get
136 self.con.write(b'\n')
137 # We get two new lines \r\n ?
138 data = self.readline()
139 self.log_console_line(data.decode())
140
141 self.print_lines_in_buffer()
142
143 if self.back_at_prompt():
144 self.log.debug("We are already logged in.")
145 return True
146
147 # Read all line till we get login:
148 while 1:
149 # We need to use self.in_waiting because with self.con.in_waiting we get
150 # not the complete string
151 size = len(self.buffer) + self.in_waiting
152 data = self.peek(size)
153
154 pattern = r"^.*login: "
155 pattern = re.compile(pattern)
156
157 if pattern.search(data.decode()):
158 break
159 else:
160 self.log.debug("The pattern does not match")
161 self.log.debug(self.peek(len(self.buffer) + self.in_waiting))
162 self.log_console_line(self.readline().decode())
163
164 # We can login
165 string = "{}\n".format(self.username)
166 self.con.write(string.encode())
167 self.con.flush()
168 # read the login out of the buffer
169 data = self.readline()
170 self.log.debug("This is the login:{}".format(data))
171 self.log_console_line(data.decode())
172
173 # We need to wait her till we get the full string "Password:"
174 #This is useless but self.in_waiting will wait the correct amount of time
175 size = self.in_waiting
176
177 string = "{}\n".format(password)
178 self.con.write(string.encode())
179 self.con.flush()
180 # Print the 'Password:' line
181 data = self.readline()
182 self.log_console_line(data.decode())
183
184 while not self.back_at_prompt():
185 # This will fail if the login failed so we need to look for the failed keyword
186 data = self.readline()
187 self.log_console_line(data.decode())
188
189 return True
190
191 def write(self, string):
192 self.log.debug(string)
193 self.con.write(string.encode())
194 self.con.flush()
195
196 def command(self, command):
197 self.write("{}; echo \"END: $?\"\n".format(command))
198
199 # We need to read out the prompt for this command first
200 # If we do not do this we will break the loop immediately
201 # because the prompt for this command is still in the buffer
202 data = self.readline()
203 self.log_console_line(data.decode())
204
205 while not self.back_at_prompt():
206 data = self.readline()
207 self.log_console_line(data.decode())
208
209 # We saved our exit code in data (the last line)
210 self.log.debug(data.decode())
211 data = data.decode().replace("END: ", "")
212 self.log.debug(data)
213 self.log.debug(data.strip())
214 return data.strip()