Log all messages to a recipe with the name of the corresponding test
[nitsi.git] / serial_connection.py
CommitLineData
f0f16021
JS
1#!/usr/bin/python3
2import serial
3
4import re
1ed8ca9f 5import os
f0f16021
JS
6
7from time import sleep
8import sys
1ed8ca9f
JS
9import logging
10
11logger = logging.getLogger("nitsi.serial")
f0f16021
JS
12
13class serial_connection():
14 def __init__(self, device, username=None):
15 self.buffer = b""
16 self.back_at_prompt_pattern = None
17 self.username = username
1ed8ca9f 18 self.log = logger.getChild(os.path.basename(device))
5045ec58 19 self.log.setLevel(logging.INFO)
f0f16021
JS
20 self.con = serial.Serial(device)
21
22 def read(self, size=1):
23 if len(self.buffer) >= size:
24 # throw away first size bytes in buffer
25 data = self.buffer[:size]
26 # Set the buffer to the non used bytes
27 self.buffer = self.buffer[size:]
28 return data
29 else:
30 data = self.buffer
31 # Set the size to the value we have to read now
32 size = size - len(self.buffer)
33 # Set the buffer empty
34 self.buffer = b""
35 return data + self.con.read(size)
36
37 def peek(self, size=1):
38 if len(self.buffer) <= size:
39 self.buffer += self.con.read(size=size - len(self.buffer))
40
41 return self.buffer[:size]
42
43 def readline(self):
44 self.log.debug(self.buffer)
45 self.buffer = self.buffer + self.con.read(self.con.in_waiting)
46 if b"\n" in self.buffer:
47 size = self.buffer.index(b"\n") + 1
48 self.log.debug("We have a whole line in the buffer")
49 self.log.debug(self.buffer)
50 self.log.debug("We split at {}".format(size))
51 data = self.buffer[:size]
52 self.buffer = self.buffer[size:]
53 self.log.debug(data)
54 self.log.debug(self.buffer)
55 return data
56
57 data = self.buffer
58 self.buffer = b""
59 return data + self.con.readline()
60
61 def back_at_prompt(self):
62 data = self.peek()
63 if not data == b"[":
64 return False
65
66 # We need to use self.in_waiting because with self.con.in_waiting we get
67 # not the complete string
68 size = len(self.buffer) + self.in_waiting
69 data = self.peek(size)
70
71
72 if self.back_at_prompt_pattern == None:
73 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
74 self.back_at_prompt_pattern = re.compile(r"^\[{}@.+\]#".format(self.username), re.MULTILINE)
75
76 if self.back_at_prompt_pattern.search(data.decode()):
77 return True
78 else:
79 return False
80
81 def log_console_line(self, line):
82 self.log.debug("Get in function log_console_line()")
83 sys.stdout.write(line)
84
85 @property
86 def in_waiting(self):
87 in_waiting_before = 0
88 sleep(0.5)
89
90 while in_waiting_before != self.con.in_waiting:
91 in_waiting_before = self.con.in_waiting
92 sleep(0.5)
93
94 return self.con.in_waiting
95
96 def line_in_buffer(self):
97 if b"\n" in self.buffer:
98 return True
99
100 return False
101
102 def print_lines_in_buffer(self):
103 while True:
104 self.log.debug("Fill buffer ...")
105 self.peek(len(self.buffer) + self.in_waiting)
106 self.log.debug("Current buffer length: {}".format(len(self.buffer)))
107 if self.line_in_buffer() == True:
108 while self.line_in_buffer() == True:
109 data = self.readline()
110 self.log_console_line(data.decode())
111 else:
112 self.log.debug("We have printed all lines in the buffer")
113 break
114
115 def login(self, password):
116 if self.username == None:
117 self.log.error("Username cannot be blank")
118 return False
119
120 self.print_lines_in_buffer()
121
122 # Hit enter to see what we get
123 self.con.write(b'\n')
124 # We get two new lines \r\n ?
125 data = self.readline()
126 self.log_console_line(data.decode())
127
128 self.print_lines_in_buffer()
129
130 if self.back_at_prompt():
131 self.log.debug("We are already logged in.")
132 return True
133
134 # Read all line till we get login:
135 while 1:
136 # We need to use self.in_waiting because with self.con.in_waiting we get
137 # not the complete string
138 size = len(self.buffer) + self.in_waiting
139 data = self.peek(size)
140
141 pattern = r"^.*login: "
142 pattern = re.compile(pattern)
143
144 if pattern.search(data.decode()):
145 break
146 else:
147 self.log.debug("The pattern does not match")
148 self.log.debug(self.peek(len(self.buffer) + self.in_waiting))
149 self.log_console_line(self.readline().decode())
150
151 # We can login
152 string = "{}\n".format(self.username)
153 self.con.write(string.encode())
154 self.con.flush()
155 # read the login out of the buffer
156 data = self.readline()
157 self.log.debug("This is the login:{}".format(data))
158 self.log_console_line(data.decode())
159
160 # We need to wait her till we get the full string "Password:"
161 #This is useless but self.in_waiting will wait the correct amount of time
162 size = self.in_waiting
163
164 string = "{}\n".format(password)
165 self.con.write(string.encode())
166 self.con.flush()
167 # Print the 'Password:' line
168 data = self.readline()
169 self.log_console_line(data.decode())
170
171 while not self.back_at_prompt():
172 # This will fail if the login failed so we need to look for the failed keyword
173 data = self.readline()
174 self.log_console_line(data.decode())
175
176 return True
177
178 def write(self, string):
179 self.log.debug(string)
180 self.con.write(string.encode())
181 self.con.flush()
182
183 def command(self, command):
184 self.write("{}; echo \"END: $?\"\n".format(command))
185
186 # We need to read out the prompt for this command first
187 # If we do not do this we will break the loop immediately
188 # because the prompt for this command is still in the buffer
189 data = self.readline()
190 self.log_console_line(data.decode())
191
192 while not self.back_at_prompt():
193 data = self.readline()
194 self.log_console_line(data.decode())
195
196 # We saved our exit code in data (the last line)
197 self.log.debug(data.decode())
198 data = data.decode().replace("END: ", "")
199 self.log.debug(data)
200 self.log.debug(data.strip())
201 return data.strip()