Add .gitignore
[nitsi.git] / src / nitsi / serial_connection.py
1 #!/usr/bin/python3
2 import serial
3
4 import re
5 import os
6
7 from time import sleep
8 import sys
9 import logging
10
11 logger = logging.getLogger("nitsi.serial")
12
13 class serial_connection():
14     def __init__(self, device, username=None):
15         self.buffer = b""
16         self.back_at_prompt_pattern =  None
17         self.username = username
18         self.log = logger.getChild(os.path.basename(device))
19         self.log.setLevel(logging.INFO)
20         self.con = serial.Serial(device)
21
22     def read(self, size=1):
23         if len(self.buffer) >= size:
24             # throw away first size bytes in buffer
25             data =  self.buffer[:size]
26             # Set the buffer to the non used bytes
27             self.buffer = self.buffer[size:]
28             return data
29         else:
30             data = self.buffer
31             # Set the size to the value we have to read now
32             size = size - len(self.buffer)
33             # Set the buffer empty
34             self.buffer = b""
35             return data + self.con.read(size)
36
37     def peek(self, size=1):
38         if len(self.buffer) <= size:
39             self.buffer += self.con.read(size=size - len(self.buffer))
40
41         return self.buffer[:size]
42
43     def readline(self):
44         self.log.debug(self.buffer)
45         self.buffer = self.buffer + self.con.read(self.con.in_waiting)
46         if b"\n" in self.buffer:
47             size = self.buffer.index(b"\n") + 1
48             self.log.debug("We have a whole line in the buffer")
49             self.log.debug(self.buffer)
50             self.log.debug("We split at {}".format(size))
51             data = self.buffer[:size]
52             self.buffer = self.buffer[size:]
53             self.log.debug(data)
54             self.log.debug(self.buffer)
55             return data
56
57         data = self.buffer
58         self.buffer = b""
59         return data + self.con.readline()
60
61     def back_at_prompt(self):
62         data = self.peek()
63         if not data == b"[":
64             return False
65
66         # We need to use self.in_waiting because with self.con.in_waiting we get
67         # not the complete string
68         size = len(self.buffer) + self.in_waiting
69         data = self.peek(size)
70
71
72         if self.back_at_prompt_pattern == None:
73             #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
74             self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
75
76         if self.back_at_prompt_pattern.search(data.decode()):
77             return True
78         else:
79             return False
80
81     def log_console_line(self, line):
82         self.log.debug("Get in function log_console_line()")
83         sys.stdout.write(line)
84
85     @property
86     def in_waiting(self):
87         in_waiting_before = 0
88         sleep(0.5)
89
90         while in_waiting_before != self.con.in_waiting:
91             in_waiting_before = self.con.in_waiting
92             sleep(0.5)
93
94         return self.con.in_waiting
95
96     def line_in_buffer(self):
97         if b"\n" in self.buffer:
98             return True
99
100         return False
101
102     def print_lines_in_buffer(self):
103         while True:
104             self.log.debug("Fill buffer ...")
105             self.peek(len(self.buffer) + self.in_waiting)
106             self.log.debug("Current buffer length: {}".format(len(self.buffer)))
107             if self.line_in_buffer() == True:
108                 while self.line_in_buffer() == True:
109                     data = self.readline()
110                     self.log_console_line(data.decode())
111             else:
112                 self.log.debug("We have printed all lines in the buffer")
113                 break
114
115     def login(self, password):
116         if self.username == None:
117             self.log.error("Username cannot be blank")
118             return False
119
120         self.print_lines_in_buffer()
121
122         # Hit enter to see what we get
123         self.con.write(b'\n')
124         # We get two new lines \r\n ?
125         data = self.readline()
126         self.log_console_line(data.decode())
127
128         self.print_lines_in_buffer()
129
130         if self.back_at_prompt():
131             self.log.debug("We are already logged in.")
132             return True
133
134         # Read all line till we get login:
135         while 1:
136             # We need to use self.in_waiting because with self.con.in_waiting we get
137             # not the complete string
138             size = len(self.buffer) + self.in_waiting
139             data = self.peek(size)
140
141             pattern = r"^.*login: "
142             pattern = re.compile(pattern)
143
144             if pattern.search(data.decode()):
145                 break
146             else:
147                 self.log.debug("The pattern does not match")
148                 self.log.debug(self.peek(len(self.buffer) + self.in_waiting))
149                 self.log_console_line(self.readline().decode())
150
151         # We can login
152         string = "{}\n".format(self.username)
153         self.con.write(string.encode())
154         self.con.flush()
155         # read the login out of the buffer
156         data = self.readline()
157         self.log.debug("This is the login:{}".format(data))
158         self.log_console_line(data.decode())
159
160         # We need to wait her till we get the full string "Password:"
161         #This is useless but self.in_waiting will wait the correct amount of time
162         size = self.in_waiting
163
164         string = "{}\n".format(password)
165         self.con.write(string.encode())
166         self.con.flush()
167         # Print the 'Password:' line
168         data = self.readline()
169         self.log_console_line(data.decode())
170
171         while not self.back_at_prompt():
172             # This will fail if the login failed so we need to look for the failed keyword
173             data = self.readline()
174             self.log_console_line(data.decode())
175
176         return True
177
178     def write(self, string):
179         self.log.debug(string)
180         self.con.write(string.encode())
181         self.con.flush()
182
183     def command(self, command):
184         self.write("{}; echo \"END: $?\"\n".format(command))
185
186         # We need to read out the prompt for this command first
187         # If we do not do this we will break the loop immediately
188         # because the prompt for this command is still in the buffer
189         data = self.readline()
190         self.log_console_line(data.decode())
191
192         while not self.back_at_prompt():
193             data = self.readline()
194             self.log_console_line(data.decode())
195
196         # We saved our exit code in data (the last line)
197         self.log.debug(data.decode())
198         data = data.decode().replace("END: ", "")
199         self.log.debug(data)
200         self.log.debug(data.strip())
201         return data.strip()