]>
git.ipfire.org Git - nitsi.git/blob - src/nitsi/serial_connection.py
12 log
= logging
.getLogger("nitsi.serial")
14 class SerialConnection():
15 def __init__(self
, device
, username
=None, password
=None, log_file
=None, name
=None, log_start_time
=None, longest_machine_name
=10):
17 self
.back_at_prompt_pattern
= None
18 self
.username
= username
19 self
.password
= password
22 self
.log_file
= log_file
23 self
.log
= log
.getChild(name
)
24 self
.log
.setLevel(logging
.INFO
)
26 # We create here a closed serial connection
27 self
.con
= serial
.Serial()
28 # Set the port in a second step to avoid that the connection is brought up automatically
29 self
.con
.port
= self
.device
31 self
.log_output
= self
.log
.getChild("output")
32 # Do not propagate the output to ancestor loggers as it looks ugly
33 self
.log_output
.propagate
= False
34 # Logging handler for file
35 log_file_handler
= logging
.FileHandler(self
.log_file
)
36 log_file_handler
.setLevel(logging
.INFO
)
37 log_file_handler
.terminator
= ""
38 # Loggin Handler for Stream
39 stream_handler
= logging
.StreamHandler()
40 stream_handler
.setLevel(logging
.INFO
)
41 stream_handler
.terminator
= ""
42 formatter
= logger
.TestFormatter(name
=self
.name
,
43 start_time
=log_start_time
,
44 longest_machine_name
=longest_machine_name
)
45 log_file_handler
.setFormatter(formatter
)
46 stream_handler
.setFormatter(formatter
)
47 self
.log_output
.addHandler(log_file_handler
)
48 self
.log_output
.addHandler(stream_handler
)
51 # Check if the serial port is open, if not open the port
53 self
.log
.debug("Connection to the serial port is open")
55 self
.log
.debug("Connection to the serial port is closed, try to open it.")
57 # Try to login, if we are already logged in we detect this and exit the function
61 if not self
.con
.is_open
:
62 self
.log
.debug("Connection to the serial port is already closed")
64 self
.log
.debug("Connection to the serial port is open, try to close it.")
67 def read(self
, size
=1):
68 if len(self
.buffer) >= size
:
69 # throw away first size bytes in buffer
70 data
= self
.buffer[:size
]
71 # Set the buffer to the non used bytes
72 self
.buffer = self
.buffer[size
:]
76 # Set the size to the value we have to read now
77 size
= size
- len(self
.buffer)
78 # Set the buffer empty
80 return data
+ self
.con
.read(size
)
82 def peek(self
, size
=1):
83 if len(self
.buffer) <= size
:
84 self
.buffer += self
.con
.read(size
=size
- len(self
.buffer))
86 return self
.buffer[:size
]
89 self
.log
.debug(self
.buffer)
90 self
.buffer = self
.buffer + self
.con
.read(self
.con
.in_waiting
)
91 if b
"\n" in self
.buffer:
92 size
= self
.buffer.index(b
"\n") + 1
93 self
.log
.debug("We have a whole line in the buffer")
94 self
.log
.debug(self
.buffer)
95 self
.log
.debug("We split at {}".format(size
))
96 data
= self
.buffer[:size
]
97 self
.buffer = self
.buffer[size
:]
99 self
.log
.debug(self
.buffer)
104 return data
+ self
.con
.readline()
106 def back_at_prompt(self
):
107 self
.log
.debug("Check if we are back at prompt")
109 self
.log
.debug("First char in buffer is: '{}'".format(data
.decode()))
113 # We need to use self.in_waiting because with self.con.in_waiting we get
114 # not the complete string
115 size
= len(self
.buffer) + self
.in_waiting
116 data
= self
.peek(size
)
117 self
.log
.debug("Data is: '{}'".format(data
))
119 # When we have an \n in the buffer we are not at the prompt,
120 # instead we still have a line in the buffer
121 if self
.line_in_buffer():
124 if self
.back_at_prompt_pattern
== None:
125 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
126 self
.back_at_prompt_pattern
= re
.compile(r
"^\[{}@.+\]#".format(self
.username
), re
.MULTILINE
)
128 if self
.back_at_prompt_pattern
.search(data
.decode()):
133 def log_console_line(self
, line
):
134 self
.log
.debug("Get in function log_console_line()")
135 self
.log_output
.info(line
)
136 #sys.stdout.write(line)
139 def in_waiting(self
):
140 in_waiting_before
= 0
143 while in_waiting_before
!= self
.con
.in_waiting
:
144 in_waiting_before
= self
.con
.in_waiting
147 return self
.con
.in_waiting
149 def line_in_buffer(self
):
150 if b
"\n" in self
.buffer:
155 def print_lines_in_buffer(self
):
157 self
.log
.debug("Fill buffer ...")
158 self
.peek(len(self
.buffer) + self
.in_waiting
)
159 self
.log
.debug("Current buffer length: {}".format(len(self
.buffer)))
160 if self
.line_in_buffer() == True:
161 while self
.line_in_buffer() == True:
162 data
= self
.readline()
163 self
.log_console_line(data
.decode())
165 self
.log
.debug("We have printed all lines in the buffer")
169 if self
.username
== None:
170 self
.log
.error("Username cannot be blank")
173 self
.print_lines_in_buffer()
175 # Hit enter to see what we get
176 self
.con
.write(b
'\n')
177 # We get two new lines \r\n ?
178 data
= self
.readline()
179 self
.log_console_line(data
.decode())
181 self
.print_lines_in_buffer()
183 if self
.back_at_prompt():
184 self
.log
.debug("We are already logged in.")
187 # Read all line till we get login:
189 # We need to use self.in_waiting because with self.con.in_waiting we get
190 # not the complete string
191 size
= len(self
.buffer) + self
.in_waiting
192 data
= self
.peek(size
)
194 pattern
= r
"^.*login: "
195 pattern
= re
.compile(pattern
)
197 if pattern
.search(data
.decode()):
200 self
.log
.debug("The pattern does not match")
201 self
.log
.debug(self
.peek(len(self
.buffer) + self
.in_waiting
))
202 self
.log_console_line(self
.readline().decode())
205 string
= "{}\n".format(self
.username
)
206 self
.con
.write(string
.encode())
208 # read the login out of the buffer
209 data
= self
.readline()
210 self
.log
.debug("This is the login:{}".format(data
))
211 self
.log_console_line(data
.decode())
213 # We need to wait her till we get the full string "Password:"
214 #This is useless but self.in_waiting will wait the correct amount of time
215 size
= self
.in_waiting
217 string
= "{}\n".format(self
.password
)
218 self
.con
.write(string
.encode())
220 # Print the 'Password:' line
221 data
= self
.readline()
222 self
.log_console_line(data
.decode())
224 while not self
.back_at_prompt():
225 # This will fail if the login failed so we need to look for the failed keyword
226 data
= self
.readline()
227 self
.log_console_line(data
.decode())
231 def write(self
, string
):
232 self
.log
.debug(string
)
233 self
.con
.write(string
.encode())
236 def command(self
, command
):
237 self
.write("{}; echo \"END: $?\"\n".format(command
))
239 # We need to read out the prompt for this command first
240 # If we do not do this we will break the loop immediately
241 # because the prompt for this command is still in the buffer
242 data
= self
.readline()
243 self
.log_console_line(data
.decode())
245 while not self
.back_at_prompt():
246 data
= self
.readline()
247 self
.log_console_line(data
.decode())
249 self
.log
.debug("We get a prompt so we should be in the last line")
251 # We saved our exit code in data (the last line)
252 self
.log
.debug(data
.decode())
253 data
= data
.decode().replace("END: ", "")
255 self
.log
.debug(data
.strip())