]>
git.ipfire.org Git - nitsi.git/blob - src/nitsi/serial_connection.py
11 from nitsi
.logger
import TestFormatter
13 logger
= logging
.getLogger("nitsi.serial")
15 class serial_connection():
16 def __init__(self
, device
, username
=None, log_file
=None, name
=None, log_start_time
=None, longest_machine_name
=10):
18 self
.back_at_prompt_pattern
= None
19 self
.username
= username
21 self
.log_file
= log_file
22 self
.log
= logger
.getChild(name
)
23 self
.log
.setLevel(logging
.INFO
)
24 self
.con
= serial
.Serial(device
)
26 self
.log_output
= self
.log
.getChild("output")
27 log_file_handler
= logging
.FileHandler(self
.log_file
)
28 log_file_handler
.setLevel(logging
.INFO
)
29 log_file_handler
.terminator
= ""
30 formatter
= TestFormatter(name
=self
.name
,
31 start_time
=log_start_time
,
32 longest_machine_name
=longest_machine_name
)
33 log_file_handler
.setFormatter(formatter
)
34 self
.log_output
.addHandler(log_file_handler
)
36 def read(self
, size
=1):
37 if len(self
.buffer) >= size
:
38 # throw away first size bytes in buffer
39 data
= self
.buffer[:size
]
40 # Set the buffer to the non used bytes
41 self
.buffer = self
.buffer[size
:]
45 # Set the size to the value we have to read now
46 size
= size
- len(self
.buffer)
47 # Set the buffer empty
49 return data
+ self
.con
.read(size
)
51 def peek(self
, size
=1):
52 if len(self
.buffer) <= size
:
53 self
.buffer += self
.con
.read(size
=size
- len(self
.buffer))
55 return self
.buffer[:size
]
58 self
.log
.debug(self
.buffer)
59 self
.buffer = self
.buffer + self
.con
.read(self
.con
.in_waiting
)
60 if b
"\n" in self
.buffer:
61 size
= self
.buffer.index(b
"\n") + 1
62 self
.log
.debug("We have a whole line in the buffer")
63 self
.log
.debug(self
.buffer)
64 self
.log
.debug("We split at {}".format(size
))
65 data
= self
.buffer[:size
]
66 self
.buffer = self
.buffer[size
:]
68 self
.log
.debug(self
.buffer)
73 return data
+ self
.con
.readline()
75 def back_at_prompt(self
):
80 # We need to use self.in_waiting because with self.con.in_waiting we get
81 # not the complete string
82 size
= len(self
.buffer) + self
.in_waiting
83 data
= self
.peek(size
)
86 if self
.back_at_prompt_pattern
== None:
87 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
88 self
.back_at_prompt_pattern
= re
.compile(r
"^\[{}@.+\]#".format(self
.username
), re
.MULTILINE
)
90 if self
.back_at_prompt_pattern
.search(data
.decode()):
95 def log_console_line(self
, line
):
96 self
.log
.debug("Get in function log_console_line()")
97 self
.log_output
.info(line
)
98 sys
.stdout
.write(line
)
101 def in_waiting(self
):
102 in_waiting_before
= 0
105 while in_waiting_before
!= self
.con
.in_waiting
:
106 in_waiting_before
= self
.con
.in_waiting
109 return self
.con
.in_waiting
111 def line_in_buffer(self
):
112 if b
"\n" in self
.buffer:
117 def print_lines_in_buffer(self
):
119 self
.log
.debug("Fill buffer ...")
120 self
.peek(len(self
.buffer) + self
.in_waiting
)
121 self
.log
.debug("Current buffer length: {}".format(len(self
.buffer)))
122 if self
.line_in_buffer() == True:
123 while self
.line_in_buffer() == True:
124 data
= self
.readline()
125 self
.log_console_line(data
.decode())
127 self
.log
.debug("We have printed all lines in the buffer")
130 def login(self
, password
):
131 if self
.username
== None:
132 self
.log
.error("Username cannot be blank")
135 self
.print_lines_in_buffer()
137 # Hit enter to see what we get
138 self
.con
.write(b
'\n')
139 # We get two new lines \r\n ?
140 data
= self
.readline()
141 self
.log_console_line(data
.decode())
143 self
.print_lines_in_buffer()
145 if self
.back_at_prompt():
146 self
.log
.debug("We are already logged in.")
149 # Read all line till we get login:
151 # We need to use self.in_waiting because with self.con.in_waiting we get
152 # not the complete string
153 size
= len(self
.buffer) + self
.in_waiting
154 data
= self
.peek(size
)
156 pattern
= r
"^.*login: "
157 pattern
= re
.compile(pattern
)
159 if pattern
.search(data
.decode()):
162 self
.log
.debug("The pattern does not match")
163 self
.log
.debug(self
.peek(len(self
.buffer) + self
.in_waiting
))
164 self
.log_console_line(self
.readline().decode())
167 string
= "{}\n".format(self
.username
)
168 self
.con
.write(string
.encode())
170 # read the login out of the buffer
171 data
= self
.readline()
172 self
.log
.debug("This is the login:{}".format(data
))
173 self
.log_console_line(data
.decode())
175 # We need to wait her till we get the full string "Password:"
176 #This is useless but self.in_waiting will wait the correct amount of time
177 size
= self
.in_waiting
179 string
= "{}\n".format(password
)
180 self
.con
.write(string
.encode())
182 # Print the 'Password:' line
183 data
= self
.readline()
184 self
.log_console_line(data
.decode())
186 while not self
.back_at_prompt():
187 # This will fail if the login failed so we need to look for the failed keyword
188 data
= self
.readline()
189 self
.log_console_line(data
.decode())
193 def write(self
, string
):
194 self
.log
.debug(string
)
195 self
.con
.write(string
.encode())
198 def command(self
, command
):
199 self
.write("{}; echo \"END: $?\"\n".format(command
))
201 # We need to read out the prompt for this command first
202 # If we do not do this we will break the loop immediately
203 # because the prompt for this command is still in the buffer
204 data
= self
.readline()
205 self
.log_console_line(data
.decode())
207 while not self
.back_at_prompt():
208 data
= self
.readline()
209 self
.log_console_line(data
.decode())
211 # We saved our exit code in data (the last line)
212 self
.log
.debug(data
.decode())
213 data
= data
.decode().replace("END: ", "")
215 self
.log
.debug(data
.strip())