]> git.ipfire.org Git - nitsi.git/blame - serial_connection.py
We now use the logging module
[nitsi.git] / serial_connection.py
CommitLineData
f0f16021
JS
1#!/usr/bin/python3
2import serial
3
4import re
1ed8ca9f 5import os
f0f16021
JS
6
7from time import sleep
8import sys
1ed8ca9f
JS
9import logging
10
11logger = logging.getLogger("nitsi.serial")
f0f16021
JS
12
13class serial_connection():
14 def __init__(self, device, username=None):
15 self.buffer = b""
16 self.back_at_prompt_pattern = None
17 self.username = username
1ed8ca9f 18 self.log = logger.getChild(os.path.basename(device))
f0f16021
JS
19 self.con = serial.Serial(device)
20
21 def read(self, size=1):
22 if len(self.buffer) >= size:
23 # throw away first size bytes in buffer
24 data = self.buffer[:size]
25 # Set the buffer to the non used bytes
26 self.buffer = self.buffer[size:]
27 return data
28 else:
29 data = self.buffer
30 # Set the size to the value we have to read now
31 size = size - len(self.buffer)
32 # Set the buffer empty
33 self.buffer = b""
34 return data + self.con.read(size)
35
36 def peek(self, size=1):
37 if len(self.buffer) <= size:
38 self.buffer += self.con.read(size=size - len(self.buffer))
39
40 return self.buffer[:size]
41
42 def readline(self):
43 self.log.debug(self.buffer)
44 self.buffer = self.buffer + self.con.read(self.con.in_waiting)
45 if b"\n" in self.buffer:
46 size = self.buffer.index(b"\n") + 1
47 self.log.debug("We have a whole line in the buffer")
48 self.log.debug(self.buffer)
49 self.log.debug("We split at {}".format(size))
50 data = self.buffer[:size]
51 self.buffer = self.buffer[size:]
52 self.log.debug(data)
53 self.log.debug(self.buffer)
54 return data
55
56 data = self.buffer
57 self.buffer = b""
58 return data + self.con.readline()
59
60 def back_at_prompt(self):
61 data = self.peek()
62 if not data == b"[":
63 return False
64
65 # We need to use self.in_waiting because with self.con.in_waiting we get
66 # not the complete string
67 size = len(self.buffer) + self.in_waiting
68 data = self.peek(size)
69
70
71 if self.back_at_prompt_pattern == None:
72 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
73 self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
74
75 if self.back_at_prompt_pattern.search(data.decode()):
76 return True
77 else:
78 return False
79
80 def log_console_line(self, line):
81 self.log.debug("Get in function log_console_line()")
82 sys.stdout.write(line)
83
84 @property
85 def in_waiting(self):
86 in_waiting_before = 0
87 sleep(0.5)
88
89 while in_waiting_before != self.con.in_waiting:
90 in_waiting_before = self.con.in_waiting
91 sleep(0.5)
92
93 return self.con.in_waiting
94
95 def line_in_buffer(self):
96 if b"\n" in self.buffer:
97 return True
98
99 return False
100
101 def print_lines_in_buffer(self):
102 while True:
103 self.log.debug("Fill buffer ...")
104 self.peek(len(self.buffer) + self.in_waiting)
105 self.log.debug("Current buffer length: {}".format(len(self.buffer)))
106 if self.line_in_buffer() == True:
107 while self.line_in_buffer() == True:
108 data = self.readline()
109 self.log_console_line(data.decode())
110 else:
111 self.log.debug("We have printed all lines in the buffer")
112 break
113
114 def login(self, password):
115 if self.username == None:
116 self.log.error("Username cannot be blank")
117 return False
118
119 self.print_lines_in_buffer()
120
121 # Hit enter to see what we get
122 self.con.write(b'\n')
123 # We get two new lines \r\n ?
124 data = self.readline()
125 self.log_console_line(data.decode())
126
127 self.print_lines_in_buffer()
128
129 if self.back_at_prompt():
130 self.log.debug("We are already logged in.")
131 return True
132
133 # Read all line till we get login:
134 while 1:
135 # We need to use self.in_waiting because with self.con.in_waiting we get
136 # not the complete string
137 size = len(self.buffer) + self.in_waiting
138 data = self.peek(size)
139
140 pattern = r"^.*login: "
141 pattern = re.compile(pattern)
142
143 if pattern.search(data.decode()):
144 break
145 else:
146 self.log.debug("The pattern does not match")
147 self.log.debug(self.peek(len(self.buffer) + self.in_waiting))
148 self.log_console_line(self.readline().decode())
149
150 # We can login
151 string = "{}\n".format(self.username)
152 self.con.write(string.encode())
153 self.con.flush()
154 # read the login out of the buffer
155 data = self.readline()
156 self.log.debug("This is the login:{}".format(data))
157 self.log_console_line(data.decode())
158
159 # We need to wait her till we get the full string "Password:"
160 #This is useless but self.in_waiting will wait the correct amount of time
161 size = self.in_waiting
162
163 string = "{}\n".format(password)
164 self.con.write(string.encode())
165 self.con.flush()
166 # Print the 'Password:' line
167 data = self.readline()
168 self.log_console_line(data.decode())
169
170 while not self.back_at_prompt():
171 # This will fail if the login failed so we need to look for the failed keyword
172 data = self.readline()
173 self.log_console_line(data.decode())
174
175 return True
176
177 def write(self, string):
178 self.log.debug(string)
179 self.con.write(string.encode())
180 self.con.flush()
181
182 def command(self, command):
183 self.write("{}; echo \"END: $?\"\n".format(command))
184
185 # We need to read out the prompt for this command first
186 # If we do not do this we will break the loop immediately
187 # because the prompt for this command is still in the buffer
188 data = self.readline()
189 self.log_console_line(data.decode())
190
191 while not self.back_at_prompt():
192 data = self.readline()
193 self.log_console_line(data.decode())
194
195 # We saved our exit code in data (the last line)
196 self.log.debug(data.decode())
197 data = data.decode().replace("END: ", "")
198 self.log.debug(data)
199 self.log.debug(data.strip())
200 return data.strip()