]> git.ipfire.org Git - thirdparty/hostap.git/blame - tests/hwsim/wpasupplicant.py
WPS ER: Allow UPnP interface to be forced
[thirdparty/hostap.git] / tests / hwsim / wpasupplicant.py
CommitLineData
1ae73b03
JM
1#!/usr/bin/python
2#
3# Python class for controlling wpa_supplicant
4# Copyright (c) 2013, Jouni Malinen <j@w1.fi>
5#
6# This software may be distributed under the terms of the BSD license.
7# See README for more details.
8
9import os
10import time
11import logging
c68f9a61 12import re
1ae73b03
JM
13import wpaspy
14
15logger = logging.getLogger(__name__)
16wpas_ctrl = '/var/run/wpa_supplicant'
17
18class WpaSupplicant:
19 def __init__(self, ifname):
20 self.ifname = ifname
f3f8ee88 21 self.group_ifname = None
1ae73b03
JM
22 self.ctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
23 self.mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, ifname))
24 self.mon.attach()
25
26 def request(self, cmd):
27 logger.debug(self.ifname + ": CTRL: " + cmd)
28 return self.ctrl.request(cmd)
29
f3f8ee88
JM
30 def group_request(self, cmd):
31 if self.group_ifname and self.group_ifname != self.ifname:
32 logger.debug(self.group_ifname + ": CTRL: " + cmd)
33 gctrl = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname))
34 return gctrl.request(cmd)
35 return self.request(cmd)
36
1ae73b03
JM
37 def ping(self):
38 return "PONG" in self.request("PING")
39
40 def reset(self):
62d58f7a 41 self.request("FLUSH")
803edd1c 42 self.request("SET ignore_old_scan_res 0")
f3f8ee88 43 self.group_ifname = None
1ae73b03 44
07a2e61b
JM
45 def add_network(self):
46 id = self.request("ADD_NETWORK")
47 if "FAIL" in id:
48 raise Exception("ADD_NETWORK failed")
49 return int(id)
50
51 def remove_network(self, id):
52 id = self.request("REMOVE_NETWORK " + str(id))
53 if "FAIL" in id:
54 raise Exception("REMOVE_NETWORK failed")
55 return None
56
57 def set_network(self, id, field, value):
58 res = self.request("SET_NETWORK " + str(id) + " " + field + " " + value)
59 if "FAIL" in res:
60 raise Exception("SET_NETWORK failed")
61 return None
62
63 def set_network_quoted(self, id, field, value):
64 res = self.request("SET_NETWORK " + str(id) + " " + field + ' "' + value + '"')
65 if "FAIL" in res:
66 raise Exception("SET_NETWORK failed")
67 return None
68
93a06242
JM
69 def add_cred(self):
70 id = self.request("ADD_CRED")
71 if "FAIL" in id:
72 raise Exception("ADD_CRED failed")
73 return int(id)
74
75 def remove_cred(self, id):
76 id = self.request("REMOVE_CRED " + str(id))
77 if "FAIL" in id:
78 raise Exception("REMOVE_CRED failed")
79 return None
80
81 def set_cred(self, id, field, value):
82 res = self.request("SET_CRED " + str(id) + " " + field + " " + value)
83 if "FAIL" in res:
84 raise Exception("SET_CRED failed")
85 return None
86
87 def set_cred_quoted(self, id, field, value):
88 res = self.request("SET_CRED " + str(id) + " " + field + ' "' + value + '"')
89 if "FAIL" in res:
90 raise Exception("SET_CRED failed")
91 return None
92
81266da7
JM
93 def select_network(self, id):
94 id = self.request("SELECT_NETWORK " + str(id))
95 if "FAIL" in id:
96 raise Exception("SELECT_NETWORK failed")
97 return None
98
99 def connect_network(self, id):
100 self.dump_monitor()
101 self.select_network(id)
102 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=10)
103 if ev is None:
104 raise Exception("Association with the AP timed out")
105 self.dump_monitor()
106
302b7a1b 107 def get_status(self):
1ae73b03
JM
108 res = self.request("STATUS")
109 lines = res.splitlines()
302b7a1b 110 vals = dict()
1ae73b03
JM
111 for l in lines:
112 [name,value] = l.split('=', 1)
302b7a1b
JM
113 vals[name] = value
114 return vals
115
116 def get_status_field(self, field):
117 vals = self.get_status()
118 if field in vals:
119 return vals[field]
1ae73b03
JM
120 return None
121
302b7a1b 122 def get_group_status(self):
7cb08cdb
JM
123 res = self.group_request("STATUS")
124 lines = res.splitlines()
302b7a1b 125 vals = dict()
7cb08cdb
JM
126 for l in lines:
127 [name,value] = l.split('=', 1)
302b7a1b
JM
128 vals[name] = value
129 return vals
130
131 def get_group_status_field(self, field):
132 vals = self.get_group_status()
133 if field in vals:
134 return vals[field]
7cb08cdb
JM
135 return None
136
1ae73b03 137 def p2p_dev_addr(self):
302b7a1b 138 return self.get_status_field("p2p_device_address")
1ae73b03 139
7cb08cdb 140 def p2p_interface_addr(self):
302b7a1b 141 return self.get_group_status_field("address")
7cb08cdb 142
1ae73b03
JM
143 def p2p_listen(self):
144 return self.request("P2P_LISTEN")
145
146 def p2p_find(self, social=False):
147 if social:
148 return self.request("P2P_FIND type=social")
149 return self.request("P2P_FIND")
150
5743006d
JM
151 def p2p_stop_find(self):
152 return self.request("P2P_STOP_FIND")
153
1ae73b03
JM
154 def wps_read_pin(self):
155 #TODO: make this random
156 self.pin = "12345670"
157 return self.pin
158
159 def peer_known(self, peer, full=True):
160 res = self.request("P2P_PEER " + peer)
161 if peer.lower() not in res.lower():
162 return False
163 if not full:
164 return True
165 return "[PROBE_REQ_ONLY]" not in res
166
d963f037 167 def discover_peer(self, peer, full=True, timeout=15, social=True):
1ae73b03
JM
168 logger.info(self.ifname + ": Trying to discover peer " + peer)
169 if self.peer_known(peer, full):
170 return True
d963f037 171 self.p2p_find(social)
1ae73b03
JM
172 count = 0
173 while count < timeout:
174 time.sleep(1)
175 count = count + 1
176 if self.peer_known(peer, full):
177 return True
178 return False
179
f7b1a750
JM
180 def group_form_result(self, ev, expect_failure=False):
181 if expect_failure:
182 if "P2P-GROUP-STARTED" in ev:
183 raise Exception("Group formation succeeded when expecting failure")
184 exp = r'<.>(P2P-GO-NEG-FAILURE) status=([0-9]*)'
185 s = re.split(exp, ev)
186 if len(s) < 3:
187 return None
188 res = {}
189 res['result'] = 'go-neg-failed'
190 res['status'] = int(s[2])
191 return res
192
193 if "P2P-GROUP-STARTED" not in ev:
194 raise Exception("No P2P-GROUP-STARTED event seen")
195
c68f9a61
JM
196 exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*)'
197 s = re.split(exp, ev)
198 if len(s) < 8:
199 raise Exception("Could not parse P2P-GROUP-STARTED")
200 res = {}
201 res['result'] = 'success'
202 res['ifname'] = s[2]
f3f8ee88 203 self.group_ifname = s[2]
c68f9a61
JM
204 res['role'] = s[3]
205 res['ssid'] = s[4]
206 res['freq'] = s[5]
207 p = re.match(r'psk=([0-9a-f]*)', s[6])
208 if p:
209 res['psk'] = p.group(1)
210 p = re.match(r'passphrase="(.*)"', s[6])
211 if p:
212 res['passphrase'] = p.group(1)
213 res['go_dev_addr'] = s[7]
214 return res
215
809079d3 216 def p2p_go_neg_auth(self, peer, pin, method, go_intent=None):
1ae73b03
JM
217 if not self.discover_peer(peer):
218 raise Exception("Peer " + peer + " not found")
219 self.dump_monitor()
220 cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + " auth"
809079d3
JM
221 if go_intent:
222 cmd = cmd + ' go_intent=' + str(go_intent)
1ae73b03
JM
223 if "OK" in self.request(cmd):
224 return None
225 raise Exception("P2P_CONNECT (auth) failed")
226
809079d3 227 def p2p_go_neg_auth_result(self, timeout=1, expect_failure=False):
f7b1a750 228 ev = self.wait_event(["P2P-GROUP-STARTED","P2P-GO-NEG-FAILURE"], timeout);
c68f9a61 229 if ev is None:
809079d3
JM
230 if expect_failure:
231 return None
c68f9a61
JM
232 raise Exception("Group formation timed out")
233 self.dump_monitor()
f7b1a750 234 return self.group_form_result(ev, expect_failure)
c68f9a61 235
809079d3 236 def p2p_go_neg_init(self, peer, pin, method, timeout=0, go_intent=None, expect_failure=False):
1ae73b03
JM
237 if not self.discover_peer(peer):
238 raise Exception("Peer " + peer + " not found")
239 self.dump_monitor()
1a4d80b8
JM
240 if pin:
241 cmd = "P2P_CONNECT " + peer + " " + pin + " " + method
242 else:
243 cmd = "P2P_CONNECT " + peer + " " + method
809079d3
JM
244 if go_intent:
245 cmd = cmd + ' go_intent=' + str(go_intent)
1ae73b03
JM
246 if "OK" in self.request(cmd):
247 if timeout == 0:
248 self.dump_monitor()
249 return None
f7b1a750 250 ev = self.wait_event(["P2P-GROUP-STARTED","P2P-GO-NEG-FAILURE"], timeout)
c68f9a61 251 if ev is None:
809079d3
JM
252 if expect_failure:
253 return None
c68f9a61
JM
254 raise Exception("Group formation timed out")
255 self.dump_monitor()
f7b1a750 256 return self.group_form_result(ev, expect_failure)
1ae73b03
JM
257 raise Exception("P2P_CONNECT failed")
258
f7b1a750 259 def wait_event(self, events, timeout):
1ae73b03 260 count = 0
4d7d61b6 261 while count < timeout * 10:
1ae73b03 262 count = count + 1
c68f9a61 263 time.sleep(0.1)
1ae73b03
JM
264 while self.mon.pending():
265 ev = self.mon.recv()
809079d3 266 logger.debug(self.ifname + ": " + ev)
f7b1a750
JM
267 for event in events:
268 if event in ev:
269 return ev
c68f9a61 270 return None
1ae73b03
JM
271
272 def dump_monitor(self):
273 while self.mon.pending():
274 ev = self.mon.recv()
275 logger.debug(self.ifname + ": " + ev)
3eb29b7b 276
a311c61d
JM
277 def remove_group(self, ifname=None):
278 if ifname is None:
f3f8ee88 279 ifname = self.group_ifname if self.group_ifname else self.iname
3eb29b7b
JM
280 if "OK" not in self.request("P2P_GROUP_REMOVE " + ifname):
281 raise Exception("Group could not be removed")
f3f8ee88 282 self.group_ifname = None
4ea8d3b5 283
5924d4c1 284 def p2p_start_go(self, persistent=None, freq=None):
4ea8d3b5
JM
285 self.dump_monitor()
286 cmd = "P2P_GROUP_ADD"
07a2e61b
JM
287 if persistent is None:
288 pass
289 elif persistent is True:
290 cmd = cmd + " persistent"
291 else:
292 cmd = cmd + " persistent=" + str(persistent)
5924d4c1
JM
293 if freq:
294 cmd = cmd + " freq=" + freq
4ea8d3b5
JM
295 if "OK" in self.request(cmd):
296 ev = self.wait_event(["P2P-GROUP-STARTED"], timeout=5)
297 if ev is None:
298 raise Exception("GO start up timed out")
299 self.dump_monitor()
300 return self.group_form_result(ev)
301 raise Exception("P2P_GROUP_ADD failed")
302
303 def p2p_go_authorize_client(self, pin):
304 cmd = "WPS_PIN any " + pin
f3f8ee88 305 if "FAIL" in self.group_request(cmd):
4ea8d3b5
JM
306 raise Exception("Failed to authorize client connection on GO")
307 return None
308
309 def p2p_connect_group(self, go_addr, pin, timeout=0):
310 self.dump_monitor()
d963f037 311 if not self.discover_peer(go_addr, social=False):
4ea8d3b5
JM
312 raise Exception("GO " + go_addr + " not found")
313 self.dump_monitor()
314 cmd = "P2P_CONNECT " + go_addr + " " + pin + " join"
315 if "OK" in self.request(cmd):
316 if timeout == 0:
317 self.dump_monitor()
318 return None
319 ev = self.wait_event(["P2P-GROUP-STARTED"], timeout)
320 if ev is None:
321 raise Exception("Joining the group timed out")
322 self.dump_monitor()
323 return self.group_form_result(ev)
324 raise Exception("P2P_CONNECT(join) failed")
7cb08cdb
JM
325
326 def tdls_setup(self, peer):
327 cmd = "TDLS_SETUP " + peer
328 if "FAIL" in self.group_request(cmd):
329 raise Exception("Failed to request TDLS setup")
330 return None
331
332 def tdls_teardown(self, peer):
333 cmd = "TDLS_TEARDOWN " + peer
334 if "FAIL" in self.group_request(cmd):
335 raise Exception("Failed to request TDLS teardown")
336 return None
b61e418c 337
835a546b 338 def connect(self, ssid, psk=None, proto=None, key_mgmt=None, wep_key0=None, ieee80211w=None):
b61e418c
JM
339 logger.info("Connect STA " + self.ifname + " to AP")
340 id = self.add_network()
341 self.set_network_quoted(id, "ssid", ssid)
342 if psk:
343 self.set_network_quoted(id, "psk", psk)
344 if proto:
345 self.set_network(id, "proto", proto)
346 if key_mgmt:
347 self.set_network(id, "key_mgmt", key_mgmt)
835a546b
JM
348 if ieee80211w:
349 self.set_network(id, "ieee80211w", ieee80211w)
b61e418c
JM
350 if wep_key0:
351 self.set_network(id, "wep_key0", wep_key0)
352 self.connect_network(id)
5126138c
JM
353
354 def scan(self, type=None):
355 if type:
356 cmd = "SCAN TYPE=" + type
357 else:
358 cmd = "SCAN"
359 self.dump_monitor()
360 if not "OK" in self.request(cmd):
361 raise Exception("Failed to trigger scan")
362 ev = self.wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
363 if ev is None:
364 raise Exception("Scan timed out")
365
366 def roam(self, bssid):
367 self.dump_monitor()
368 self.request("ROAM " + bssid)
369 ev = self.wait_event(["CTRL-EVENT-CONNECTED"], timeout=10)
370 if ev is None:
371 raise Exception("Roaming with the AP timed out")
372 self.dump_monitor()