We now use the logging module
[nitsi.git] / serial_connection.py
1 #!/usr/bin/python3
2 import serial
3
4 import re
5 import os
6
7 from time import sleep
8 import sys
9 import logging
10
11 logger = logging.getLogger("nitsi.serial")
12
13 class serial_connection():
14     def __init__(self, device, username=None):
15         self.buffer = b""
16         self.back_at_prompt_pattern =  None
17         self.username = username
18         self.log = logger.getChild(os.path.basename(device))
19         self.con = serial.Serial(device)
20
21     def read(self, size=1):
22         if len(self.buffer) >= size:
23             # throw away first size bytes in buffer
24             data =  self.buffer[:size]
25             # Set the buffer to the non used bytes
26             self.buffer = self.buffer[size:]
27             return data
28         else:
29             data = self.buffer
30             # Set the size to the value we have to read now
31             size = size - len(self.buffer)
32             # Set the buffer empty
33             self.buffer = b""
34             return data + self.con.read(size)
35
36     def peek(self, size=1):
37         if len(self.buffer) <= size:
38             self.buffer += self.con.read(size=size - len(self.buffer))
39
40         return self.buffer[:size]
41
42     def readline(self):
43         self.log.debug(self.buffer)
44         self.buffer = self.buffer + self.con.read(self.con.in_waiting)
45         if b"\n" in self.buffer:
46             size = self.buffer.index(b"\n") + 1
47             self.log.debug("We have a whole line in the buffer")
48             self.log.debug(self.buffer)
49             self.log.debug("We split at {}".format(size))
50             data = self.buffer[:size]
51             self.buffer = self.buffer[size:]
52             self.log.debug(data)
53             self.log.debug(self.buffer)
54             return data
55
56         data = self.buffer
57         self.buffer = b""
58         return data + self.con.readline()
59
60     def back_at_prompt(self):
61         data = self.peek()
62         if not data == b"[":
63             return False
64
65         # We need to use self.in_waiting because with self.con.in_waiting we get
66         # not the complete string
67         size = len(self.buffer) + self.in_waiting
68         data = self.peek(size)
69
70
71         if self.back_at_prompt_pattern == None:
72             #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
73             self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
74
75         if self.back_at_prompt_pattern.search(data.decode()):
76             return True
77         else:
78             return False
79
80     def log_console_line(self, line):
81         self.log.debug("Get in function log_console_line()")
82         sys.stdout.write(line)
83
84     @property
85     def in_waiting(self):
86         in_waiting_before = 0
87         sleep(0.5)
88
89         while in_waiting_before != self.con.in_waiting:
90             in_waiting_before = self.con.in_waiting
91             sleep(0.5)
92
93         return self.con.in_waiting
94
95     def line_in_buffer(self):
96         if b"\n" in self.buffer:
97             return True
98
99         return False
100
101     def print_lines_in_buffer(self):
102         while True:
103             self.log.debug("Fill buffer ...")
104             self.peek(len(self.buffer) + self.in_waiting)
105             self.log.debug("Current buffer length: {}".format(len(self.buffer)))
106             if self.line_in_buffer() == True:
107                 while self.line_in_buffer() == True:
108                     data = self.readline()
109                     self.log_console_line(data.decode())
110             else:
111                 self.log.debug("We have printed all lines in the buffer")
112                 break
113
114     def login(self, password):
115         if self.username == None:
116             self.log.error("Username cannot be blank")
117             return False
118
119         self.print_lines_in_buffer()
120
121         # Hit enter to see what we get
122         self.con.write(b'\n')
123         # We get two new lines \r\n ?
124         data = self.readline()
125         self.log_console_line(data.decode())
126
127         self.print_lines_in_buffer()
128
129         if self.back_at_prompt():
130             self.log.debug("We are already logged in.")
131             return True
132
133         # Read all line till we get login:
134         while 1:
135             # We need to use self.in_waiting because with self.con.in_waiting we get
136             # not the complete string
137             size = len(self.buffer) + self.in_waiting
138             data = self.peek(size)
139
140             pattern = r"^.*login: "
141             pattern = re.compile(pattern)
142
143             if pattern.search(data.decode()):
144                 break
145             else:
146                 self.log.debug("The pattern does not match")
147                 self.log.debug(self.peek(len(self.buffer) + self.in_waiting))
148                 self.log_console_line(self.readline().decode())
149
150         # We can login
151         string = "{}\n".format(self.username)
152         self.con.write(string.encode())
153         self.con.flush()
154         # read the login out of the buffer
155         data = self.readline()
156         self.log.debug("This is the login:{}".format(data))
157         self.log_console_line(data.decode())
158
159         # We need to wait her till we get the full string "Password:"
160         #This is useless but self.in_waiting will wait the correct amount of time
161         size = self.in_waiting
162
163         string = "{}\n".format(password)
164         self.con.write(string.encode())
165         self.con.flush()
166         # Print the 'Password:' line
167         data = self.readline()
168         self.log_console_line(data.decode())
169
170         while not self.back_at_prompt():
171             # This will fail if the login failed so we need to look for the failed keyword
172             data = self.readline()
173             self.log_console_line(data.decode())
174
175         return True
176
177     def write(self, string):
178         self.log.debug(string)
179         self.con.write(string.encode())
180         self.con.flush()
181
182     def command(self, command):
183         self.write("{}; echo \"END: $?\"\n".format(command))
184
185         # We need to read out the prompt for this command first
186         # If we do not do this we will break the loop immediately
187         # because the prompt for this command is still in the buffer
188         data = self.readline()
189         self.log_console_line(data.decode())
190
191         while not self.back_at_prompt():
192             data = self.readline()
193             self.log_console_line(data.decode())
194
195         # We saved our exit code in data (the last line)
196         self.log.debug(data.decode())
197         data = data.decode().replace("END: ", "")
198         self.log.debug(data)
199         self.log.debug(data.strip())
200         return data.strip()