Move the serial_connection class into a seperated file
[nitsi.git] / serial_connection.py
CommitLineData
f0f16021
JS
1#!/usr/bin/python3
2import serial
3
4import re
5
6from time import sleep
7import sys
8
9class serial_connection():
10 def __init__(self, device, username=None):
11 self.buffer = b""
12 self.back_at_prompt_pattern = None
13 self.username = username
14 self.log = log(1)
15 self.con = serial.Serial(device)
16
17 def read(self, size=1):
18 if len(self.buffer) >= size:
19 # throw away first size bytes in buffer
20 data = self.buffer[:size]
21 # Set the buffer to the non used bytes
22 self.buffer = self.buffer[size:]
23 return data
24 else:
25 data = self.buffer
26 # Set the size to the value we have to read now
27 size = size - len(self.buffer)
28 # Set the buffer empty
29 self.buffer = b""
30 return data + self.con.read(size)
31
32 def peek(self, size=1):
33 if len(self.buffer) <= size:
34 self.buffer += self.con.read(size=size - len(self.buffer))
35
36 return self.buffer[:size]
37
38 def readline(self):
39 self.log.debug(self.buffer)
40 self.buffer = self.buffer + self.con.read(self.con.in_waiting)
41 if b"\n" in self.buffer:
42 size = self.buffer.index(b"\n") + 1
43 self.log.debug("We have a whole line in the buffer")
44 self.log.debug(self.buffer)
45 self.log.debug("We split at {}".format(size))
46 data = self.buffer[:size]
47 self.buffer = self.buffer[size:]
48 self.log.debug(data)
49 self.log.debug(self.buffer)
50 return data
51
52 data = self.buffer
53 self.buffer = b""
54 return data + self.con.readline()
55
56 def back_at_prompt(self):
57 data = self.peek()
58 if not data == b"[":
59 return False
60
61 # We need to use self.in_waiting because with self.con.in_waiting we get
62 # not the complete string
63 size = len(self.buffer) + self.in_waiting
64 data = self.peek(size)
65
66
67 if self.back_at_prompt_pattern == None:
68 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
69 self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
70
71 if self.back_at_prompt_pattern.search(data.decode()):
72 return True
73 else:
74 return False
75
76 def log_console_line(self, line):
77 self.log.debug("Get in function log_console_line()")
78 sys.stdout.write(line)
79
80 @property
81 def in_waiting(self):
82 in_waiting_before = 0
83 sleep(0.5)
84
85 while in_waiting_before != self.con.in_waiting:
86 in_waiting_before = self.con.in_waiting
87 sleep(0.5)
88
89 return self.con.in_waiting
90
91 def line_in_buffer(self):
92 if b"\n" in self.buffer:
93 return True
94
95 return False
96
97 def print_lines_in_buffer(self):
98 while True:
99 self.log.debug("Fill buffer ...")
100 self.peek(len(self.buffer) + self.in_waiting)
101 self.log.debug("Current buffer length: {}".format(len(self.buffer)))
102 if self.line_in_buffer() == True:
103 while self.line_in_buffer() == True:
104 data = self.readline()
105 self.log_console_line(data.decode())
106 else:
107 self.log.debug("We have printed all lines in the buffer")
108 break
109
110 def login(self, password):
111 if self.username == None:
112 self.log.error("Username cannot be blank")
113 return False
114
115 self.print_lines_in_buffer()
116
117 # Hit enter to see what we get
118 self.con.write(b'\n')
119 # We get two new lines \r\n ?
120 data = self.readline()
121 self.log_console_line(data.decode())
122
123 self.print_lines_in_buffer()
124
125 if self.back_at_prompt():
126 self.log.debug("We are already logged in.")
127 return True
128
129 # Read all line till we get login:
130 while 1:
131 # We need to use self.in_waiting because with self.con.in_waiting we get
132 # not the complete string
133 size = len(self.buffer) + self.in_waiting
134 data = self.peek(size)
135
136 pattern = r"^.*login: "
137 pattern = re.compile(pattern)
138
139 if pattern.search(data.decode()):
140 break
141 else:
142 self.log.debug("The pattern does not match")
143 self.log.debug(self.peek(len(self.buffer) + self.in_waiting))
144 self.log_console_line(self.readline().decode())
145
146 # We can login
147 string = "{}\n".format(self.username)
148 self.con.write(string.encode())
149 self.con.flush()
150 # read the login out of the buffer
151 data = self.readline()
152 self.log.debug("This is the login:{}".format(data))
153 self.log_console_line(data.decode())
154
155 # We need to wait her till we get the full string "Password:"
156 #This is useless but self.in_waiting will wait the correct amount of time
157 size = self.in_waiting
158
159 string = "{}\n".format(password)
160 self.con.write(string.encode())
161 self.con.flush()
162 # Print the 'Password:' line
163 data = self.readline()
164 self.log_console_line(data.decode())
165
166 while not self.back_at_prompt():
167 # This will fail if the login failed so we need to look for the failed keyword
168 data = self.readline()
169 self.log_console_line(data.decode())
170
171 return True
172
173 def write(self, string):
174 self.log.debug(string)
175 self.con.write(string.encode())
176 self.con.flush()
177
178 def command(self, command):
179 self.write("{}; echo \"END: $?\"\n".format(command))
180
181 # We need to read out the prompt for this command first
182 # If we do not do this we will break the loop immediately
183 # because the prompt for this command is still in the buffer
184 data = self.readline()
185 self.log_console_line(data.decode())
186
187 while not self.back_at_prompt():
188 data = self.readline()
189 self.log_console_line(data.decode())
190
191 # We saved our exit code in data (the last line)
192 self.log.debug(data.decode())
193 data = data.decode().replace("END: ", "")
194 self.log.debug(data)
195 self.log.debug(data.strip())
196 return data.strip()