4dc524e557330eed13375d50e2dde0e9596a87e6
[nitsi.git] / serial_connection.py
1 #!/usr/bin/python3
2 import serial
3
4 import re
5
6 from time import sleep
7 import sys
8
9 class serial_connection():
10     def __init__(self, device, username=None):
11         self.buffer = b""
12         self.back_at_prompt_pattern =  None
13         self.username = username
14         self.log = log(1)
15         self.con = serial.Serial(device)
16
17     def read(self, size=1):
18         if len(self.buffer) >= size:
19             # throw away first size bytes in buffer
20             data =  self.buffer[:size]
21             # Set the buffer to the non used bytes
22             self.buffer = self.buffer[size:]
23             return data
24         else:
25             data = self.buffer
26             # Set the size to the value we have to read now
27             size = size - len(self.buffer)
28             # Set the buffer empty
29             self.buffer = b""
30             return data + self.con.read(size)
31
32     def peek(self, size=1):
33         if len(self.buffer) <= size:
34             self.buffer += self.con.read(size=size - len(self.buffer))
35
36         return self.buffer[:size]
37
38     def readline(self):
39         self.log.debug(self.buffer)
40         self.buffer = self.buffer + self.con.read(self.con.in_waiting)
41         if b"\n" in self.buffer:
42             size = self.buffer.index(b"\n") + 1
43             self.log.debug("We have a whole line in the buffer")
44             self.log.debug(self.buffer)
45             self.log.debug("We split at {}".format(size))
46             data = self.buffer[:size]
47             self.buffer = self.buffer[size:]
48             self.log.debug(data)
49             self.log.debug(self.buffer)
50             return data
51
52         data = self.buffer
53         self.buffer = b""
54         return data + self.con.readline()
55
56     def back_at_prompt(self):
57         data = self.peek()
58         if not data == b"[":
59             return False
60
61         # We need to use self.in_waiting because with self.con.in_waiting we get
62         # not the complete string
63         size = len(self.buffer) + self.in_waiting
64         data = self.peek(size)
65
66
67         if self.back_at_prompt_pattern == None:
68             #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
69             self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
70
71         if self.back_at_prompt_pattern.search(data.decode()):
72             return True
73         else:
74             return False
75
76     def log_console_line(self, line):
77         self.log.debug("Get in function log_console_line()")
78         sys.stdout.write(line)
79
80     @property
81     def in_waiting(self):
82         in_waiting_before = 0
83         sleep(0.5)
84
85         while in_waiting_before != self.con.in_waiting:
86             in_waiting_before = self.con.in_waiting
87             sleep(0.5)
88
89         return self.con.in_waiting
90
91     def line_in_buffer(self):
92         if b"\n" in self.buffer:
93             return True
94
95         return False
96
97     def print_lines_in_buffer(self):
98         while True:
99             self.log.debug("Fill buffer ...")
100             self.peek(len(self.buffer) + self.in_waiting)
101             self.log.debug("Current buffer length: {}".format(len(self.buffer)))
102             if self.line_in_buffer() == True:
103                 while self.line_in_buffer() == True:
104                     data = self.readline()
105                     self.log_console_line(data.decode())
106             else:
107                 self.log.debug("We have printed all lines in the buffer")
108                 break
109
110     def login(self, password):
111         if self.username == None:
112             self.log.error("Username cannot be blank")
113             return False
114
115         self.print_lines_in_buffer()
116
117         # Hit enter to see what we get
118         self.con.write(b'\n')
119         # We get two new lines \r\n ?
120         data = self.readline()
121         self.log_console_line(data.decode())
122
123         self.print_lines_in_buffer()
124
125         if self.back_at_prompt():
126             self.log.debug("We are already logged in.")
127             return True
128
129         # Read all line till we get login:
130         while 1:
131             # We need to use self.in_waiting because with self.con.in_waiting we get
132             # not the complete string
133             size = len(self.buffer) + self.in_waiting
134             data = self.peek(size)
135
136             pattern = r"^.*login: "
137             pattern = re.compile(pattern)
138
139             if pattern.search(data.decode()):
140                 break
141             else:
142                 self.log.debug("The pattern does not match")
143                 self.log.debug(self.peek(len(self.buffer) + self.in_waiting))
144                 self.log_console_line(self.readline().decode())
145
146         # We can login
147         string = "{}\n".format(self.username)
148         self.con.write(string.encode())
149         self.con.flush()
150         # read the login out of the buffer
151         data = self.readline()
152         self.log.debug("This is the login:{}".format(data))
153         self.log_console_line(data.decode())
154
155         # We need to wait her till we get the full string "Password:"
156         #This is useless but self.in_waiting will wait the correct amount of time
157         size = self.in_waiting
158
159         string = "{}\n".format(password)
160         self.con.write(string.encode())
161         self.con.flush()
162         # Print the 'Password:' line
163         data = self.readline()
164         self.log_console_line(data.decode())
165
166         while not self.back_at_prompt():
167             # This will fail if the login failed so we need to look for the failed keyword
168             data = self.readline()
169             self.log_console_line(data.decode())
170
171         return True
172
173     def write(self, string):
174         self.log.debug(string)
175         self.con.write(string.encode())
176         self.con.flush()
177
178     def command(self, command):
179         self.write("{}; echo \"END: $?\"\n".format(command))
180
181         # We need to read out the prompt for this command first
182         # If we do not do this we will break the loop immediately
183         # because the prompt for this command is still in the buffer
184         data = self.readline()
185         self.log_console_line(data.decode())
186
187         while not self.back_at_prompt():
188             data = self.readline()
189             self.log_console_line(data.decode())
190
191         # We saved our exit code in data (the last line)
192         self.log.debug(data.decode())
193         data = data.decode().replace("END: ", "")
194         self.log.debug(data)
195         self.log.debug(data.strip())
196         return data.strip()