]> git.ipfire.org Git - people/ms/nitsi.git/blob - src/nitsi/serial_connection.py
Makefile: Ship nitsi.in in tarball
[people/ms/nitsi.git] / src / nitsi / serial_connection.py
1 #!/usr/bin/python3
2
3 import logging
4 import os
5 import re
6 import serial
7 import sys
8 import time
9
10 from . import logger
11
12 log = logging.getLogger("nitsi.serial")
13
14 class serial_connection():
15 def __init__(self, device, username=None, log_file=None, name=None, log_start_time=None, longest_machine_name=10):
16 self.buffer = b""
17 self.back_at_prompt_pattern = None
18 self.username = username
19 self.name = name
20 self.log_file = log_file
21 self.log = log.getChild(name)
22 self.log.setLevel(logging.INFO)
23 self.con = serial.Serial(device)
24
25 self.log_output = self.log.getChild("output")
26 log_file_handler = logging.FileHandler(self.log_file)
27 log_file_handler.setLevel(logging.INFO)
28 log_file_handler.terminator = ""
29 formatter = logger.TestFormatter(name=self.name,
30 start_time=log_start_time,
31 longest_machine_name=longest_machine_name)
32 log_file_handler.setFormatter(formatter)
33 self.log_output.addHandler(log_file_handler)
34
35 def read(self, size=1):
36 if len(self.buffer) >= size:
37 # throw away first size bytes in buffer
38 data = self.buffer[:size]
39 # Set the buffer to the non used bytes
40 self.buffer = self.buffer[size:]
41 return data
42 else:
43 data = self.buffer
44 # Set the size to the value we have to read now
45 size = size - len(self.buffer)
46 # Set the buffer empty
47 self.buffer = b""
48 return data + self.con.read(size)
49
50 def peek(self, size=1):
51 if len(self.buffer) <= size:
52 self.buffer += self.con.read(size=size - len(self.buffer))
53
54 return self.buffer[:size]
55
56 def readline(self):
57 self.log.debug(self.buffer)
58 self.buffer = self.buffer + self.con.read(self.con.in_waiting)
59 if b"\n" in self.buffer:
60 size = self.buffer.index(b"\n") + 1
61 self.log.debug("We have a whole line in the buffer")
62 self.log.debug(self.buffer)
63 self.log.debug("We split at {}".format(size))
64 data = self.buffer[:size]
65 self.buffer = self.buffer[size:]
66 self.log.debug(data)
67 self.log.debug(self.buffer)
68 return data
69
70 data = self.buffer
71 self.buffer = b""
72 return data + self.con.readline()
73
74 def back_at_prompt(self):
75 data = self.peek()
76 if not data == b"[":
77 return False
78
79 # We need to use self.in_waiting because with self.con.in_waiting we get
80 # not the complete string
81 size = len(self.buffer) + self.in_waiting
82 data = self.peek(size)
83
84
85 if self.back_at_prompt_pattern == None:
86 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
87 self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
88
89 if self.back_at_prompt_pattern.search(data.decode()):
90 return True
91 else:
92 return False
93
94 def log_console_line(self, line):
95 self.log.debug("Get in function log_console_line()")
96 self.log_output.info(line)
97 sys.stdout.write(line)
98
99 @property
100 def in_waiting(self):
101 in_waiting_before = 0
102 time.sleep(0.5)
103
104 while in_waiting_before != self.con.in_waiting:
105 in_waiting_before = self.con.in_waiting
106 time.sleep(0.5)
107
108 return self.con.in_waiting
109
110 def line_in_buffer(self):
111 if b"\n" in self.buffer:
112 return True
113
114 return False
115
116 def print_lines_in_buffer(self):
117 while True:
118 self.log.debug("Fill buffer ...")
119 self.peek(len(self.buffer) + self.in_waiting)
120 self.log.debug("Current buffer length: {}".format(len(self.buffer)))
121 if self.line_in_buffer() == True:
122 while self.line_in_buffer() == True:
123 data = self.readline()
124 self.log_console_line(data.decode())
125 else:
126 self.log.debug("We have printed all lines in the buffer")
127 break
128
129 def login(self, password):
130 if self.username == None:
131 self.log.error("Username cannot be blank")
132 return False
133
134 self.print_lines_in_buffer()
135
136 # Hit enter to see what we get
137 self.con.write(b'\n')
138 # We get two new lines \r\n ?
139 data = self.readline()
140 self.log_console_line(data.decode())
141
142 self.print_lines_in_buffer()
143
144 if self.back_at_prompt():
145 self.log.debug("We are already logged in.")
146 return True
147
148 # Read all line till we get login:
149 while 1:
150 # We need to use self.in_waiting because with self.con.in_waiting we get
151 # not the complete string
152 size = len(self.buffer) + self.in_waiting
153 data = self.peek(size)
154
155 pattern = r"^.*login: "
156 pattern = re.compile(pattern)
157
158 if pattern.search(data.decode()):
159 break
160 else:
161 self.log.debug("The pattern does not match")
162 self.log.debug(self.peek(len(self.buffer) + self.in_waiting))
163 self.log_console_line(self.readline().decode())
164
165 # We can login
166 string = "{}\n".format(self.username)
167 self.con.write(string.encode())
168 self.con.flush()
169 # read the login out of the buffer
170 data = self.readline()
171 self.log.debug("This is the login:{}".format(data))
172 self.log_console_line(data.decode())
173
174 # We need to wait her till we get the full string "Password:"
175 #This is useless but self.in_waiting will wait the correct amount of time
176 size = self.in_waiting
177
178 string = "{}\n".format(password)
179 self.con.write(string.encode())
180 self.con.flush()
181 # Print the 'Password:' line
182 data = self.readline()
183 self.log_console_line(data.decode())
184
185 while not self.back_at_prompt():
186 # This will fail if the login failed so we need to look for the failed keyword
187 data = self.readline()
188 self.log_console_line(data.decode())
189
190 return True
191
192 def write(self, string):
193 self.log.debug(string)
194 self.con.write(string.encode())
195 self.con.flush()
196
197 def command(self, command):
198 self.write("{}; echo \"END: $?\"\n".format(command))
199
200 # We need to read out the prompt for this command first
201 # If we do not do this we will break the loop immediately
202 # because the prompt for this command is still in the buffer
203 data = self.readline()
204 self.log_console_line(data.decode())
205
206 while not self.back_at_prompt():
207 data = self.readline()
208 self.log_console_line(data.decode())
209
210 # We saved our exit code in data (the last line)
211 self.log.debug(data.decode())
212 data = data.decode().replace("END: ", "")
213 self.log.debug(data)
214 self.log.debug(data.strip())
215 return data.strip()