]>
git.ipfire.org Git - people/ms/nitsi.git/blob - src/nitsi/serial_connection.py
12 log
= logging
.getLogger("nitsi.serial")
14 class serial_connection():
15 def __init__(self
, device
, username
=None, log_file
=None, name
=None, log_start_time
=None, longest_machine_name
=10):
17 self
.back_at_prompt_pattern
= None
18 self
.username
= username
20 self
.log_file
= log_file
21 self
.log
= log
.getChild(name
)
22 self
.log
.setLevel(logging
.INFO
)
23 self
.con
= serial
.Serial(device
)
25 self
.log_output
= self
.log
.getChild("output")
26 log_file_handler
= logging
.FileHandler(self
.log_file
)
27 log_file_handler
.setLevel(logging
.INFO
)
28 log_file_handler
.terminator
= ""
29 formatter
= logger
.TestFormatter(name
=self
.name
,
30 start_time
=log_start_time
,
31 longest_machine_name
=longest_machine_name
)
32 log_file_handler
.setFormatter(formatter
)
33 self
.log_output
.addHandler(log_file_handler
)
35 def read(self
, size
=1):
36 if len(self
.buffer) >= size
:
37 # throw away first size bytes in buffer
38 data
= self
.buffer[:size
]
39 # Set the buffer to the non used bytes
40 self
.buffer = self
.buffer[size
:]
44 # Set the size to the value we have to read now
45 size
= size
- len(self
.buffer)
46 # Set the buffer empty
48 return data
+ self
.con
.read(size
)
50 def peek(self
, size
=1):
51 if len(self
.buffer) <= size
:
52 self
.buffer += self
.con
.read(size
=size
- len(self
.buffer))
54 return self
.buffer[:size
]
57 self
.log
.debug(self
.buffer)
58 self
.buffer = self
.buffer + self
.con
.read(self
.con
.in_waiting
)
59 if b
"\n" in self
.buffer:
60 size
= self
.buffer.index(b
"\n") + 1
61 self
.log
.debug("We have a whole line in the buffer")
62 self
.log
.debug(self
.buffer)
63 self
.log
.debug("We split at {}".format(size
))
64 data
= self
.buffer[:size
]
65 self
.buffer = self
.buffer[size
:]
67 self
.log
.debug(self
.buffer)
72 return data
+ self
.con
.readline()
74 def back_at_prompt(self
):
79 # We need to use self.in_waiting because with self.con.in_waiting we get
80 # not the complete string
81 size
= len(self
.buffer) + self
.in_waiting
82 data
= self
.peek(size
)
85 if self
.back_at_prompt_pattern
== None:
86 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
87 self
.back_at_prompt_pattern
= re
.compile(r
"^\[{}@.+\]#".format(self
.username
), re
.MULTILINE
)
89 if self
.back_at_prompt_pattern
.search(data
.decode()):
94 def log_console_line(self
, line
):
95 self
.log
.debug("Get in function log_console_line()")
96 self
.log_output
.info(line
)
97 sys
.stdout
.write(line
)
100 def in_waiting(self
):
101 in_waiting_before
= 0
104 while in_waiting_before
!= self
.con
.in_waiting
:
105 in_waiting_before
= self
.con
.in_waiting
108 return self
.con
.in_waiting
110 def line_in_buffer(self
):
111 if b
"\n" in self
.buffer:
116 def print_lines_in_buffer(self
):
118 self
.log
.debug("Fill buffer ...")
119 self
.peek(len(self
.buffer) + self
.in_waiting
)
120 self
.log
.debug("Current buffer length: {}".format(len(self
.buffer)))
121 if self
.line_in_buffer() == True:
122 while self
.line_in_buffer() == True:
123 data
= self
.readline()
124 self
.log_console_line(data
.decode())
126 self
.log
.debug("We have printed all lines in the buffer")
129 def login(self
, password
):
130 if self
.username
== None:
131 self
.log
.error("Username cannot be blank")
134 self
.print_lines_in_buffer()
136 # Hit enter to see what we get
137 self
.con
.write(b
'\n')
138 # We get two new lines \r\n ?
139 data
= self
.readline()
140 self
.log_console_line(data
.decode())
142 self
.print_lines_in_buffer()
144 if self
.back_at_prompt():
145 self
.log
.debug("We are already logged in.")
148 # Read all line till we get login:
150 # We need to use self.in_waiting because with self.con.in_waiting we get
151 # not the complete string
152 size
= len(self
.buffer) + self
.in_waiting
153 data
= self
.peek(size
)
155 pattern
= r
"^.*login: "
156 pattern
= re
.compile(pattern
)
158 if pattern
.search(data
.decode()):
161 self
.log
.debug("The pattern does not match")
162 self
.log
.debug(self
.peek(len(self
.buffer) + self
.in_waiting
))
163 self
.log_console_line(self
.readline().decode())
166 string
= "{}\n".format(self
.username
)
167 self
.con
.write(string
.encode())
169 # read the login out of the buffer
170 data
= self
.readline()
171 self
.log
.debug("This is the login:{}".format(data
))
172 self
.log_console_line(data
.decode())
174 # We need to wait her till we get the full string "Password:"
175 #This is useless but self.in_waiting will wait the correct amount of time
176 size
= self
.in_waiting
178 string
= "{}\n".format(password
)
179 self
.con
.write(string
.encode())
181 # Print the 'Password:' line
182 data
= self
.readline()
183 self
.log_console_line(data
.decode())
185 while not self
.back_at_prompt():
186 # This will fail if the login failed so we need to look for the failed keyword
187 data
= self
.readline()
188 self
.log_console_line(data
.decode())
192 def write(self
, string
):
193 self
.log
.debug(string
)
194 self
.con
.write(string
.encode())
197 def command(self
, command
):
198 self
.write("{}; echo \"END: $?\"\n".format(command
))
200 # We need to read out the prompt for this command first
201 # If we do not do this we will break the loop immediately
202 # because the prompt for this command is still in the buffer
203 data
= self
.readline()
204 self
.log_console_line(data
.decode())
206 while not self
.back_at_prompt():
207 data
= self
.readline()
208 self
.log_console_line(data
.decode())
210 # We saved our exit code in data (the last line)
211 self
.log
.debug(data
.decode())
212 data
= data
.decode().replace("END: ", "")
214 self
.log
.debug(data
.strip())