]>
git.ipfire.org Git - nitsi.git/blob - serial_connection.py
9 class serial_connection():
10 def __init__(self
, device
, username
=None):
12 self
.back_at_prompt_pattern
= None
13 self
.username
= username
15 self
.con
= serial
.Serial(device
)
17 def read(self
, size
=1):
18 if len(self
.buffer) >= size
:
19 # throw away first size bytes in buffer
20 data
= self
.buffer[:size
]
21 # Set the buffer to the non used bytes
22 self
.buffer = self
.buffer[size
:]
26 # Set the size to the value we have to read now
27 size
= size
- len(self
.buffer)
28 # Set the buffer empty
30 return data
+ self
.con
.read(size
)
32 def peek(self
, size
=1):
33 if len(self
.buffer) <= size
:
34 self
.buffer += self
.con
.read(size
=size
- len(self
.buffer))
36 return self
.buffer[:size
]
39 self
.log
.debug(self
.buffer)
40 self
.buffer = self
.buffer + self
.con
.read(self
.con
.in_waiting
)
41 if b
"\n" in self
.buffer:
42 size
= self
.buffer.index(b
"\n") + 1
43 self
.log
.debug("We have a whole line in the buffer")
44 self
.log
.debug(self
.buffer)
45 self
.log
.debug("We split at {}".format(size
))
46 data
= self
.buffer[:size
]
47 self
.buffer = self
.buffer[size
:]
49 self
.log
.debug(self
.buffer)
54 return data
+ self
.con
.readline()
56 def back_at_prompt(self
):
61 # We need to use self.in_waiting because with self.con.in_waiting we get
62 # not the complete string
63 size
= len(self
.buffer) + self
.in_waiting
64 data
= self
.peek(size
)
67 if self
.back_at_prompt_pattern
== None:
68 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
69 self
.back_at_prompt_pattern
= re
.compile(r
"^\[{}@.+\]#".format(self
.username
), re
.MULTILINE
)
71 if self
.back_at_prompt_pattern
.search(data
.decode()):
76 def log_console_line(self
, line
):
77 self
.log
.debug("Get in function log_console_line()")
78 sys
.stdout
.write(line
)
85 while in_waiting_before
!= self
.con
.in_waiting
:
86 in_waiting_before
= self
.con
.in_waiting
89 return self
.con
.in_waiting
91 def line_in_buffer(self
):
92 if b
"\n" in self
.buffer:
97 def print_lines_in_buffer(self
):
99 self
.log
.debug("Fill buffer ...")
100 self
.peek(len(self
.buffer) + self
.in_waiting
)
101 self
.log
.debug("Current buffer length: {}".format(len(self
.buffer)))
102 if self
.line_in_buffer() == True:
103 while self
.line_in_buffer() == True:
104 data
= self
.readline()
105 self
.log_console_line(data
.decode())
107 self
.log
.debug("We have printed all lines in the buffer")
110 def login(self
, password
):
111 if self
.username
== None:
112 self
.log
.error("Username cannot be blank")
115 self
.print_lines_in_buffer()
117 # Hit enter to see what we get
118 self
.con
.write(b
'\n')
119 # We get two new lines \r\n ?
120 data
= self
.readline()
121 self
.log_console_line(data
.decode())
123 self
.print_lines_in_buffer()
125 if self
.back_at_prompt():
126 self
.log
.debug("We are already logged in.")
129 # Read all line till we get login:
131 # We need to use self.in_waiting because with self.con.in_waiting we get
132 # not the complete string
133 size
= len(self
.buffer) + self
.in_waiting
134 data
= self
.peek(size
)
136 pattern
= r
"^.*login: "
137 pattern
= re
.compile(pattern
)
139 if pattern
.search(data
.decode()):
142 self
.log
.debug("The pattern does not match")
143 self
.log
.debug(self
.peek(len(self
.buffer) + self
.in_waiting
))
144 self
.log_console_line(self
.readline().decode())
147 string
= "{}\n".format(self
.username
)
148 self
.con
.write(string
.encode())
150 # read the login out of the buffer
151 data
= self
.readline()
152 self
.log
.debug("This is the login:{}".format(data
))
153 self
.log_console_line(data
.decode())
155 # We need to wait her till we get the full string "Password:"
156 #This is useless but self.in_waiting will wait the correct amount of time
157 size
= self
.in_waiting
159 string
= "{}\n".format(password
)
160 self
.con
.write(string
.encode())
162 # Print the 'Password:' line
163 data
= self
.readline()
164 self
.log_console_line(data
.decode())
166 while not self
.back_at_prompt():
167 # This will fail if the login failed so we need to look for the failed keyword
168 data
= self
.readline()
169 self
.log_console_line(data
.decode())
173 def write(self
, string
):
174 self
.log
.debug(string
)
175 self
.con
.write(string
.encode())
178 def command(self
, command
):
179 self
.write("{}; echo \"END: $?\"\n".format(command
))
181 # We need to read out the prompt for this command first
182 # If we do not do this we will break the loop immediately
183 # because the prompt for this command is still in the buffer
184 data
= self
.readline()
185 self
.log_console_line(data
.decode())
187 while not self
.back_at_prompt():
188 data
= self
.readline()
189 self
.log_console_line(data
.decode())
191 # We saved our exit code in data (the last line)
192 self
.log
.debug(data
.decode())
193 data
= data
.decode().replace("END: ", "")
195 self
.log
.debug(data
.strip())