]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/remotehost.py
tests: WPA2-PSK-FT AP and RIC
[thirdparty/hostap.git] / tests / hwsim / remotehost.py
1 # Host class
2 # Copyright (c) 2016, Qualcomm Atheros, Inc.
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6
7 import logging
8 import subprocess
9 import threading
10
11 logger = logging.getLogger()
12
13 def remote_compatible(func):
14 func.remote_compatible = True
15 return func
16
17 def execute_thread(command, reply):
18 cmd = ' '.join(command)
19 logger.debug("thread run: " + cmd)
20 try:
21 status = 0
22 buf = subprocess.check_output(command, stderr=subprocess.STDOUT)
23 except subprocess.CalledProcessError as e:
24 status = e.returncode
25 buf = e.output
26
27 logger.debug("thread cmd: " + cmd)
28 logger.debug("thread exit status: " + str(status))
29 logger.debug("thread exit buf: " + str(buf))
30 reply.append(status)
31 reply.append(buf)
32
33 class Host():
34 def __init__(self, host=None, ifname=None, port=None, name="", user="root"):
35 self.host = host
36 self.name = name
37 self.user = user
38 self.monitors = []
39 self.monitor_thread = None
40 self.logs = []
41 self.ifname = ifname
42 self.port = port
43 self.dev = None
44 if self.name == "" and host != None:
45 self.name = host
46
47 def local_execute(self, command):
48 logger.debug("execute: " + str(command))
49 try:
50 status = 0
51 buf = subprocess.check_output(command, stderr=subprocess.STDOUT)
52 except subprocess.CalledProcessError as e:
53 status = e.returncode
54 buf = e.output
55
56 logger.debug("status: " + str(status))
57 logger.debug("buf: " + str(buf))
58 return status, buf
59
60 def execute(self, command):
61 if self.host is None:
62 return self.local_execute(command)
63
64 cmd = ["ssh", self.user + "@" + self.host, ' '.join(command)]
65 _cmd = self.name + " execute: " + ' '.join(cmd)
66 logger.debug(_cmd)
67 try:
68 status = 0
69 buf = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
70 except subprocess.CalledProcessError as e:
71 status = e.returncode
72 buf = e.output
73
74 logger.debug(self.name + " status: " + str(status))
75 logger.debug(self.name + " buf: " + str(buf))
76 return status, buf
77
78 # async execute
79 def execute_run(self, command, res):
80 if self.host is None:
81 cmd = command
82 else:
83 cmd = ["ssh", self.user + "@" + self.host, ' '.join(command)]
84 _cmd = self.name + " execute_run: " + ' '.join(cmd)
85 logger.debug(_cmd)
86 t = threading.Thread(target = execute_thread, args=(cmd, res))
87 t.start()
88 return t
89
90 def wait_execute_complete(self, t, wait=None):
91 if wait == None:
92 wait_str = "infinite"
93 else:
94 wait_str = str(wait) + "s"
95
96 logger.debug(self.name + " wait_execute_complete(" + wait_str + "): ")
97 if t.isAlive():
98 t.join(wait)
99
100 def add_log(self, log_file):
101 self.logs.append(log_file)
102
103 def get_logs(self, local_log_dir=None):
104 for log in self.logs:
105 if local_log_dir:
106 self.local_execute(["scp", self.user + "@[" + self.host + "]:" + log, local_log_dir])
107 self.execute(["rm", log])
108 del self.logs[:]