]>
git.ipfire.org Git - nitsi.git/blob - src/nitsi/serial_connection.py
12 log
= logging
.getLogger("nitsi.serial")
14 class SerialConnection():
15 def __init__(self
, device
, username
=None, log_file
=None, name
=None, log_start_time
=None, longest_machine_name
=10):
17 self
.back_at_prompt_pattern
= None
18 self
.username
= username
20 self
.log_file
= log_file
21 self
.log
= log
.getChild(name
)
22 self
.log
.setLevel(logging
.INFO
)
23 self
.con
= serial
.Serial(device
)
25 self
.log_output
= self
.log
.getChild("output")
26 # Do not propagate the output to ancestor loggers as it looks ugly
27 self
.log_output
.propagate
= False
28 # Logging handler for file
29 log_file_handler
= logging
.FileHandler(self
.log_file
)
30 log_file_handler
.setLevel(logging
.INFO
)
31 log_file_handler
.terminator
= ""
32 # Loggin Handler for Stream
33 stream_handler
= logging
.StreamHandler()
34 stream_handler
.setLevel(logging
.INFO
)
35 stream_handler
.terminator
= ""
36 formatter
= logger
.TestFormatter(name
=self
.name
,
37 start_time
=log_start_time
,
38 longest_machine_name
=longest_machine_name
)
39 log_file_handler
.setFormatter(formatter
)
40 stream_handler
.setFormatter(formatter
)
41 self
.log_output
.addHandler(log_file_handler
)
42 self
.log_output
.addHandler(stream_handler
)
44 def read(self
, size
=1):
45 if len(self
.buffer) >= size
:
46 # throw away first size bytes in buffer
47 data
= self
.buffer[:size
]
48 # Set the buffer to the non used bytes
49 self
.buffer = self
.buffer[size
:]
53 # Set the size to the value we have to read now
54 size
= size
- len(self
.buffer)
55 # Set the buffer empty
57 return data
+ self
.con
.read(size
)
59 def peek(self
, size
=1):
60 if len(self
.buffer) <= size
:
61 self
.buffer += self
.con
.read(size
=size
- len(self
.buffer))
63 return self
.buffer[:size
]
66 self
.log
.debug(self
.buffer)
67 self
.buffer = self
.buffer + self
.con
.read(self
.con
.in_waiting
)
68 if b
"\n" in self
.buffer:
69 size
= self
.buffer.index(b
"\n") + 1
70 self
.log
.debug("We have a whole line in the buffer")
71 self
.log
.debug(self
.buffer)
72 self
.log
.debug("We split at {}".format(size
))
73 data
= self
.buffer[:size
]
74 self
.buffer = self
.buffer[size
:]
76 self
.log
.debug(self
.buffer)
81 return data
+ self
.con
.readline()
83 def back_at_prompt(self
):
84 self
.log
.debug("Check if we are back at prompt")
86 self
.log
.debug("First char in buffer is: '{}'".format(data
.decode()))
90 # We need to use self.in_waiting because with self.con.in_waiting we get
91 # not the complete string
92 size
= len(self
.buffer) + self
.in_waiting
93 data
= self
.peek(size
)
94 self
.log
.debug("Data is: '{}'".format(data
))
96 # When we have an \n in the buffer we are not at the prompt,
97 # instead we still have a line in the buffer
98 if self
.line_in_buffer():
101 if self
.back_at_prompt_pattern
== None:
102 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
103 self
.back_at_prompt_pattern
= re
.compile(r
"^\[{}@.+\]#".format(self
.username
), re
.MULTILINE
)
105 if self
.back_at_prompt_pattern
.search(data
.decode()):
110 def log_console_line(self
, line
):
111 self
.log
.debug("Get in function log_console_line()")
112 self
.log_output
.info(line
)
113 #sys.stdout.write(line)
116 def in_waiting(self
):
117 in_waiting_before
= 0
120 while in_waiting_before
!= self
.con
.in_waiting
:
121 in_waiting_before
= self
.con
.in_waiting
124 return self
.con
.in_waiting
126 def line_in_buffer(self
):
127 if b
"\n" in self
.buffer:
132 def print_lines_in_buffer(self
):
134 self
.log
.debug("Fill buffer ...")
135 self
.peek(len(self
.buffer) + self
.in_waiting
)
136 self
.log
.debug("Current buffer length: {}".format(len(self
.buffer)))
137 if self
.line_in_buffer() == True:
138 while self
.line_in_buffer() == True:
139 data
= self
.readline()
140 self
.log_console_line(data
.decode())
142 self
.log
.debug("We have printed all lines in the buffer")
145 def login(self
, password
):
146 if self
.username
== None:
147 self
.log
.error("Username cannot be blank")
150 self
.print_lines_in_buffer()
152 # Hit enter to see what we get
153 self
.con
.write(b
'\n')
154 # We get two new lines \r\n ?
155 data
= self
.readline()
156 self
.log_console_line(data
.decode())
158 self
.print_lines_in_buffer()
160 if self
.back_at_prompt():
161 self
.log
.debug("We are already logged in.")
164 # Read all line till we get login:
166 # We need to use self.in_waiting because with self.con.in_waiting we get
167 # not the complete string
168 size
= len(self
.buffer) + self
.in_waiting
169 data
= self
.peek(size
)
171 pattern
= r
"^.*login: "
172 pattern
= re
.compile(pattern
)
174 if pattern
.search(data
.decode()):
177 self
.log
.debug("The pattern does not match")
178 self
.log
.debug(self
.peek(len(self
.buffer) + self
.in_waiting
))
179 self
.log_console_line(self
.readline().decode())
182 string
= "{}\n".format(self
.username
)
183 self
.con
.write(string
.encode())
185 # read the login out of the buffer
186 data
= self
.readline()
187 self
.log
.debug("This is the login:{}".format(data
))
188 self
.log_console_line(data
.decode())
190 # We need to wait her till we get the full string "Password:"
191 #This is useless but self.in_waiting will wait the correct amount of time
192 size
= self
.in_waiting
194 string
= "{}\n".format(password
)
195 self
.con
.write(string
.encode())
197 # Print the 'Password:' line
198 data
= self
.readline()
199 self
.log_console_line(data
.decode())
201 while not self
.back_at_prompt():
202 # This will fail if the login failed so we need to look for the failed keyword
203 data
= self
.readline()
204 self
.log_console_line(data
.decode())
208 def write(self
, string
):
209 self
.log
.debug(string
)
210 self
.con
.write(string
.encode())
213 def command(self
, command
):
214 self
.write("{}; echo \"END: $?\"\n".format(command
))
216 # We need to read out the prompt for this command first
217 # If we do not do this we will break the loop immediately
218 # because the prompt for this command is still in the buffer
219 data
= self
.readline()
220 self
.log_console_line(data
.decode())
222 while not self
.back_at_prompt():
223 data
= self
.readline()
224 self
.log_console_line(data
.decode())
226 self
.log
.debug("We get a prompt so we should be in the last line")
228 # We saved our exit code in data (the last line)
229 self
.log
.debug(data
.decode())
230 data
= data
.decode().replace("END: ", "")
232 self
.log
.debug(data
.strip())