]>
git.ipfire.org Git - nitsi.git/blob - src/nitsi/serial_connection.py
11 from nitsi
.logger
import TestFormatter
13 logger
= logging
.getLogger("nitsi.serial")
15 class serial_connection():
16 def __init__(self
, device
, username
=None, log_file
=None, name
=None):
18 self
.back_at_prompt_pattern
= None
19 self
.username
= username
21 self
.log_file
= log_file
22 self
.log
= logger
.getChild(name
)
23 self
.log
.setLevel(logging
.INFO
)
24 self
.con
= serial
.Serial(device
)
26 self
.log_output
= self
.log
.getChild("output")
27 log_file_handler
= logging
.FileHandler(self
.log_file
)
28 log_file_handler
.setLevel(logging
.INFO
)
29 log_file_handler
.terminator
= ""
30 formatter
= TestFormatter(name
=self
.name
, start_time
=None)
31 log_file_handler
.setFormatter(formatter
)
32 self
.log_output
.addHandler(log_file_handler
)
34 def read(self
, size
=1):
35 if len(self
.buffer) >= size
:
36 # throw away first size bytes in buffer
37 data
= self
.buffer[:size
]
38 # Set the buffer to the non used bytes
39 self
.buffer = self
.buffer[size
:]
43 # Set the size to the value we have to read now
44 size
= size
- len(self
.buffer)
45 # Set the buffer empty
47 return data
+ self
.con
.read(size
)
49 def peek(self
, size
=1):
50 if len(self
.buffer) <= size
:
51 self
.buffer += self
.con
.read(size
=size
- len(self
.buffer))
53 return self
.buffer[:size
]
56 self
.log
.debug(self
.buffer)
57 self
.buffer = self
.buffer + self
.con
.read(self
.con
.in_waiting
)
58 if b
"\n" in self
.buffer:
59 size
= self
.buffer.index(b
"\n") + 1
60 self
.log
.debug("We have a whole line in the buffer")
61 self
.log
.debug(self
.buffer)
62 self
.log
.debug("We split at {}".format(size
))
63 data
= self
.buffer[:size
]
64 self
.buffer = self
.buffer[size
:]
66 self
.log
.debug(self
.buffer)
71 return data
+ self
.con
.readline()
73 def back_at_prompt(self
):
78 # We need to use self.in_waiting because with self.con.in_waiting we get
79 # not the complete string
80 size
= len(self
.buffer) + self
.in_waiting
81 data
= self
.peek(size
)
84 if self
.back_at_prompt_pattern
== None:
85 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
86 self
.back_at_prompt_pattern
= re
.compile(r
"^\[{}@.+\]#".format(self
.username
), re
.MULTILINE
)
88 if self
.back_at_prompt_pattern
.search(data
.decode()):
93 def log_console_line(self
, line
):
94 self
.log
.debug("Get in function log_console_line()")
95 self
.log_output
.info(line
)
96 sys
.stdout
.write(line
)
100 in_waiting_before
= 0
103 while in_waiting_before
!= self
.con
.in_waiting
:
104 in_waiting_before
= self
.con
.in_waiting
107 return self
.con
.in_waiting
109 def line_in_buffer(self
):
110 if b
"\n" in self
.buffer:
115 def print_lines_in_buffer(self
):
117 self
.log
.debug("Fill buffer ...")
118 self
.peek(len(self
.buffer) + self
.in_waiting
)
119 self
.log
.debug("Current buffer length: {}".format(len(self
.buffer)))
120 if self
.line_in_buffer() == True:
121 while self
.line_in_buffer() == True:
122 data
= self
.readline()
123 self
.log_console_line(data
.decode())
125 self
.log
.debug("We have printed all lines in the buffer")
128 def login(self
, password
):
129 if self
.username
== None:
130 self
.log
.error("Username cannot be blank")
133 self
.print_lines_in_buffer()
135 # Hit enter to see what we get
136 self
.con
.write(b
'\n')
137 # We get two new lines \r\n ?
138 data
= self
.readline()
139 self
.log_console_line(data
.decode())
141 self
.print_lines_in_buffer()
143 if self
.back_at_prompt():
144 self
.log
.debug("We are already logged in.")
147 # Read all line till we get login:
149 # We need to use self.in_waiting because with self.con.in_waiting we get
150 # not the complete string
151 size
= len(self
.buffer) + self
.in_waiting
152 data
= self
.peek(size
)
154 pattern
= r
"^.*login: "
155 pattern
= re
.compile(pattern
)
157 if pattern
.search(data
.decode()):
160 self
.log
.debug("The pattern does not match")
161 self
.log
.debug(self
.peek(len(self
.buffer) + self
.in_waiting
))
162 self
.log_console_line(self
.readline().decode())
165 string
= "{}\n".format(self
.username
)
166 self
.con
.write(string
.encode())
168 # read the login out of the buffer
169 data
= self
.readline()
170 self
.log
.debug("This is the login:{}".format(data
))
171 self
.log_console_line(data
.decode())
173 # We need to wait her till we get the full string "Password:"
174 #This is useless but self.in_waiting will wait the correct amount of time
175 size
= self
.in_waiting
177 string
= "{}\n".format(password
)
178 self
.con
.write(string
.encode())
180 # Print the 'Password:' line
181 data
= self
.readline()
182 self
.log_console_line(data
.decode())
184 while not self
.back_at_prompt():
185 # This will fail if the login failed so we need to look for the failed keyword
186 data
= self
.readline()
187 self
.log_console_line(data
.decode())
191 def write(self
, string
):
192 self
.log
.debug(string
)
193 self
.con
.write(string
.encode())
196 def command(self
, command
):
197 self
.write("{}; echo \"END: $?\"\n".format(command
))
199 # We need to read out the prompt for this command first
200 # If we do not do this we will break the loop immediately
201 # because the prompt for this command is still in the buffer
202 data
= self
.readline()
203 self
.log_console_line(data
.decode())
205 while not self
.back_at_prompt():
206 data
= self
.readline()
207 self
.log_console_line(data
.decode())
209 # We saved our exit code in data (the last line)
210 self
.log
.debug(data
.decode())
211 data
= data
.decode().replace("END: ", "")
213 self
.log
.debug(data
.strip())