]>
git.ipfire.org Git - people/ms/nitsi.git/blob - serial_connection.py
11 logger
= logging
.getLogger("nitsi.serial")
13 class serial_connection():
14 def __init__(self
, device
, username
=None):
16 self
.back_at_prompt_pattern
= None
17 self
.username
= username
18 self
.log
= logger
.getChild(os
.path
.basename(device
))
19 self
.log
.setLevel(logging
.INFO
)
20 self
.con
= serial
.Serial(device
)
22 def read(self
, size
=1):
23 if len(self
.buffer) >= size
:
24 # throw away first size bytes in buffer
25 data
= self
.buffer[:size
]
26 # Set the buffer to the non used bytes
27 self
.buffer = self
.buffer[size
:]
31 # Set the size to the value we have to read now
32 size
= size
- len(self
.buffer)
33 # Set the buffer empty
35 return data
+ self
.con
.read(size
)
37 def peek(self
, size
=1):
38 if len(self
.buffer) <= size
:
39 self
.buffer += self
.con
.read(size
=size
- len(self
.buffer))
41 return self
.buffer[:size
]
44 self
.log
.debug(self
.buffer)
45 self
.buffer = self
.buffer + self
.con
.read(self
.con
.in_waiting
)
46 if b
"\n" in self
.buffer:
47 size
= self
.buffer.index(b
"\n") + 1
48 self
.log
.debug("We have a whole line in the buffer")
49 self
.log
.debug(self
.buffer)
50 self
.log
.debug("We split at {}".format(size
))
51 data
= self
.buffer[:size
]
52 self
.buffer = self
.buffer[size
:]
54 self
.log
.debug(self
.buffer)
59 return data
+ self
.con
.readline()
61 def back_at_prompt(self
):
66 # We need to use self.in_waiting because with self.con.in_waiting we get
67 # not the complete string
68 size
= len(self
.buffer) + self
.in_waiting
69 data
= self
.peek(size
)
72 if self
.back_at_prompt_pattern
== None:
73 #self.back_at_prompt_pattern = r"^\[{}@.+\]#".format(self.username)
74 self
.back_at_prompt_pattern
= re
.compile(r
"^\[{}@.+\]#".format(self
.username
), re
.MULTILINE
)
76 if self
.back_at_prompt_pattern
.search(data
.decode()):
81 def log_console_line(self
, line
):
82 self
.log
.debug("Get in function log_console_line()")
83 sys
.stdout
.write(line
)
90 while in_waiting_before
!= self
.con
.in_waiting
:
91 in_waiting_before
= self
.con
.in_waiting
94 return self
.con
.in_waiting
96 def line_in_buffer(self
):
97 if b
"\n" in self
.buffer:
102 def print_lines_in_buffer(self
):
104 self
.log
.debug("Fill buffer ...")
105 self
.peek(len(self
.buffer) + self
.in_waiting
)
106 self
.log
.debug("Current buffer length: {}".format(len(self
.buffer)))
107 if self
.line_in_buffer() == True:
108 while self
.line_in_buffer() == True:
109 data
= self
.readline()
110 self
.log_console_line(data
.decode())
112 self
.log
.debug("We have printed all lines in the buffer")
115 def login(self
, password
):
116 if self
.username
== None:
117 self
.log
.error("Username cannot be blank")
120 self
.print_lines_in_buffer()
122 # Hit enter to see what we get
123 self
.con
.write(b
'\n')
124 # We get two new lines \r\n ?
125 data
= self
.readline()
126 self
.log_console_line(data
.decode())
128 self
.print_lines_in_buffer()
130 if self
.back_at_prompt():
131 self
.log
.debug("We are already logged in.")
134 # Read all line till we get login:
136 # We need to use self.in_waiting because with self.con.in_waiting we get
137 # not the complete string
138 size
= len(self
.buffer) + self
.in_waiting
139 data
= self
.peek(size
)
141 pattern
= r
"^.*login: "
142 pattern
= re
.compile(pattern
)
144 if pattern
.search(data
.decode()):
147 self
.log
.debug("The pattern does not match")
148 self
.log
.debug(self
.peek(len(self
.buffer) + self
.in_waiting
))
149 self
.log_console_line(self
.readline().decode())
152 string
= "{}\n".format(self
.username
)
153 self
.con
.write(string
.encode())
155 # read the login out of the buffer
156 data
= self
.readline()
157 self
.log
.debug("This is the login:{}".format(data
))
158 self
.log_console_line(data
.decode())
160 # We need to wait her till we get the full string "Password:"
161 #This is useless but self.in_waiting will wait the correct amount of time
162 size
= self
.in_waiting
164 string
= "{}\n".format(password
)
165 self
.con
.write(string
.encode())
167 # Print the 'Password:' line
168 data
= self
.readline()
169 self
.log_console_line(data
.decode())
171 while not self
.back_at_prompt():
172 # This will fail if the login failed so we need to look for the failed keyword
173 data
= self
.readline()
174 self
.log_console_line(data
.decode())
178 def write(self
, string
):
179 self
.log
.debug(string
)
180 self
.con
.write(string
.encode())
183 def command(self
, command
):
184 self
.write("{}; echo \"END: $?\"\n".format(command
))
186 # We need to read out the prompt for this command first
187 # If we do not do this we will break the loop immediately
188 # because the prompt for this command is still in the buffer
189 data
= self
.readline()
190 self
.log_console_line(data
.decode())
192 while not self
.back_at_prompt():
193 data
= self
.readline()
194 self
.log_console_line(data
.decode())
196 # We saved our exit code in data (the last line)
197 self
.log
.debug(data
.decode())
198 data
= data
.decode().replace("END: ", "")
200 self
.log
.debug(data
.strip())