]>
git.ipfire.org Git - nitsi.git/blob - serial_connection.py
511d34927aeff9e7702a0f408ceed5b1454356a5
11 logger
= logging
.getLogger("nitsi.serial")
13 class serial_connection():
14 def __init__(self
, device
, username
=None):
16 self
.back_at_prompt_pattern
= None
17 self
.username
= username
18 self
.log
= logger
.getChild(os
.path
.basename(device
))
19 self
.con
= serial
.Serial(device
)
21 def read(self
, size
=1):
22 if len(self
.buffer) >= size
:
23 # throw away first size bytes in buffer
24 data
= self
.buffer[:size
]
25 # Set the buffer to the non used bytes
26 self
.buffer = self
.buffer[size
:]
30 # Set the size to the value we have to read now
31 size
= size
- len(self
.buffer)
32 # Set the buffer empty
34 return data
+ self
.con
.read(size
)
36 def peek(self
, size
=1):
37 if len(self
.buffer) <= size
:
38 self
.buffer += self
.con
.read(size
=size
- len(self
.buffer))
40 return self
.buffer[:size
]
43 self
.log
.debug(self
.buffer)
44 self
.buffer = self
.buffer + self
.con
.read(self
.con
.in_waiting
)
45 if b
"\n" in self
.buffer:
46 size
= self
.buffer.index(b
"\n") + 1
47 self
.log
.debug("We have a whole line in the buffer")
48 self
.log
.debug(self
.buffer)
49 self
.log
.debug("We split at {}".format(size
))
50 data
= self
.buffer[:size
]
51 self
.buffer = self
.buffer[size
:]
53 self
.log
.debug(self
.buffer)
58 return data
+ self
.con
.readline()
60 def back_at_prompt(self
):
65 # We need to use self.in_waiting because with self.con.in_waiting we get
66 # not the complete string
67 size
= len(self
.buffer) + self
.in_waiting
68 data
= self
.peek(size
)
71 if self
.back_at_prompt_pattern
== None:
72 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
73 self
.back_at_prompt_pattern
= re
.compile(r
"^\[{}@.+\]#".format(self
.username
), re
.MULTILINE
)
75 if self
.back_at_prompt_pattern
.search(data
.decode()):
80 def log_console_line(self
, line
):
81 self
.log
.debug("Get in function log_console_line()")
82 sys
.stdout
.write(line
)
89 while in_waiting_before
!= self
.con
.in_waiting
:
90 in_waiting_before
= self
.con
.in_waiting
93 return self
.con
.in_waiting
95 def line_in_buffer(self
):
96 if b
"\n" in self
.buffer:
101 def print_lines_in_buffer(self
):
103 self
.log
.debug("Fill buffer ...")
104 self
.peek(len(self
.buffer) + self
.in_waiting
)
105 self
.log
.debug("Current buffer length: {}".format(len(self
.buffer)))
106 if self
.line_in_buffer() == True:
107 while self
.line_in_buffer() == True:
108 data
= self
.readline()
109 self
.log_console_line(data
.decode())
111 self
.log
.debug("We have printed all lines in the buffer")
114 def login(self
, password
):
115 if self
.username
== None:
116 self
.log
.error("Username cannot be blank")
119 self
.print_lines_in_buffer()
121 # Hit enter to see what we get
122 self
.con
.write(b
'\n')
123 # We get two new lines \r\n ?
124 data
= self
.readline()
125 self
.log_console_line(data
.decode())
127 self
.print_lines_in_buffer()
129 if self
.back_at_prompt():
130 self
.log
.debug("We are already logged in.")
133 # Read all line till we get login:
135 # We need to use self.in_waiting because with self.con.in_waiting we get
136 # not the complete string
137 size
= len(self
.buffer) + self
.in_waiting
138 data
= self
.peek(size
)
140 pattern
= r
"^.*login: "
141 pattern
= re
.compile(pattern
)
143 if pattern
.search(data
.decode()):
146 self
.log
.debug("The pattern does not match")
147 self
.log
.debug(self
.peek(len(self
.buffer) + self
.in_waiting
))
148 self
.log_console_line(self
.readline().decode())
151 string
= "{}\n".format(self
.username
)
152 self
.con
.write(string
.encode())
154 # read the login out of the buffer
155 data
= self
.readline()
156 self
.log
.debug("This is the login:{}".format(data
))
157 self
.log_console_line(data
.decode())
159 # We need to wait her till we get the full string "Password:"
160 #This is useless but self.in_waiting will wait the correct amount of time
161 size
= self
.in_waiting
163 string
= "{}\n".format(password
)
164 self
.con
.write(string
.encode())
166 # Print the 'Password:' line
167 data
= self
.readline()
168 self
.log_console_line(data
.decode())
170 while not self
.back_at_prompt():
171 # This will fail if the login failed so we need to look for the failed keyword
172 data
= self
.readline()
173 self
.log_console_line(data
.decode())
177 def write(self
, string
):
178 self
.log
.debug(string
)
179 self
.con
.write(string
.encode())
182 def command(self
, command
):
183 self
.write("{}; echo \"END: $?\"\n".format(command
))
185 # We need to read out the prompt for this command first
186 # If we do not do this we will break the loop immediately
187 # because the prompt for this command is still in the buffer
188 data
= self
.readline()
189 self
.log_console_line(data
.decode())
191 while not self
.back_at_prompt():
192 data
= self
.readline()
193 self
.log_console_line(data
.decode())
195 # We saved our exit code in data (the last line)
196 self
.log
.debug(data
.decode())
197 data
= data
.decode().replace("END: ", "")
199 self
.log
.debug(data
.strip())