]> git.ipfire.org Git - thirdparty/hostap.git/blame - tests/hwsim/fst_module_aux.py
Fix PMKID addition to RSN element when RSN Capabilities are not present
[thirdparty/hostap.git] / tests / hwsim / fst_module_aux.py
CommitLineData
41a256ec
AN
1# FST tests related classes
2# Copyright (c) 2015, Qualcomm Atheros, Inc.
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import logging
8import subprocess
9import os
10import signal
11import time
12import re
13
14import hostapd
15import wpaspy
16import utils
17from wpasupplicant import WpaSupplicant
18
19import fst_test_common
20
21logger = logging.getLogger()
22
23def parse_fst_iface_event(ev):
24 """Parses FST iface event that comes as a string, e.g.
25 "<3>FST-EVENT-IFACE attached ifname=wlan9 group=fstg0"
26 Returns a dictionary with parsed "event_type", "ifname", and "group"; or
27 None if not an FST event or can't be parsed."""
28 event = {}
29 if ev.find("FST-EVENT-IFACE") == -1:
30 return None
31 if ev.find("attached") != -1:
32 event['event_type'] = 'attached'
33 elif ev.find("detached") != -1:
34 event['event_type'] = 'detached'
35 else:
36 return None
37 f = re.search("ifname=(\S+)", ev)
38 if f is not None:
39 event['ifname'] = f.group(1)
40 f = re.search("group=(\S+)", ev)
41 if f is not None:
42 event['group'] = f.group(1)
43 return event
44
45def parse_fst_session_event(ev):
46 """Parses FST session event that comes as a string, e.g.
47 "<3>FST-EVENT-SESSION event_type=EVENT_FST_SESSION_STATE session_id=0 reason=REASON_STT"
48 Returns a dictionary with parsed "type", "id", and "reason"; or None if not
49 a FST event or can't be parsed"""
50 event = {}
51 if ev.find("FST-EVENT-SESSION") == -1:
52 return None
53 event['new_state'] = '' # The field always exists in the dictionary
54 f = re.search("event_type=(\S+)", ev)
55 if f is None:
56 return None
57 event['type'] = f.group(1)
58 f = re.search("session_id=(\d+)", ev)
59 if f is not None:
60 event['id'] = f.group(1)
61 f = re.search("old_state=(\S+)", ev)
62 if f is not None:
63 event['old_state'] = f.group(1)
64 f = re.search("new_state=(\S+)", ev)
65 if f is not None:
66 event['new_state'] = f.group(1)
67 f = re.search("reason=(\S+)", ev)
68 if f is not None:
69 event['reason'] = f.group(1)
70 return event
71
85eb89fe 72def start_two_ap_sta_pairs(apdev, rsn=False):
41a256ec
AN
73 """auxiliary function that creates two pairs of APs and STAs"""
74 ap1 = FstAP(apdev[0]['ifname'], 'fst_11a', 'a',
75 fst_test_common.fst_test_def_chan_a,
76 fst_test_common.fst_test_def_group,
77 fst_test_common.fst_test_def_prio_low,
85eb89fe 78 fst_test_common.fst_test_def_llt, rsn=rsn)
41a256ec
AN
79 ap1.start()
80 ap2 = FstAP(apdev[1]['ifname'], 'fst_11g', 'g',
81 fst_test_common.fst_test_def_chan_g,
82 fst_test_common.fst_test_def_group,
83 fst_test_common.fst_test_def_prio_high,
85eb89fe 84 fst_test_common.fst_test_def_llt, rsn=rsn)
41a256ec
AN
85 ap2.start()
86
87 sta1 = FstSTA('wlan5',
88 fst_test_common.fst_test_def_group,
89 fst_test_common.fst_test_def_prio_low,
85eb89fe 90 fst_test_common.fst_test_def_llt, rsn=rsn)
41a256ec
AN
91 sta1.start()
92 sta2 = FstSTA('wlan6',
93 fst_test_common.fst_test_def_group,
94 fst_test_common.fst_test_def_prio_high,
85eb89fe 95 fst_test_common.fst_test_def_llt, rsn=rsn)
41a256ec
AN
96 sta2.start()
97
98 return ap1, ap2, sta1, sta2
99
100def stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2):
101 sta1.stop()
102 sta2.stop()
103 ap1.stop()
104 ap2.stop()
105
85eb89fe 106def connect_two_ap_sta_pairs(ap1, ap2, dev1, dev2, rsn=False):
41a256ec
AN
107 """Connects a pair of stations, each one to a separate AP"""
108 dev1.scan(freq=fst_test_common.fst_test_def_freq_a)
109 dev2.scan(freq=fst_test_common.fst_test_def_freq_g)
110
85eb89fe
JM
111 if rsn:
112 dev1.connect(ap1, psk="12345678",
113 scan_freq=fst_test_common.fst_test_def_freq_a)
114 dev2.connect(ap2, psk="12345678",
115 scan_freq=fst_test_common.fst_test_def_freq_g)
116 else:
117 dev1.connect(ap1, key_mgmt="NONE",
118 scan_freq=fst_test_common.fst_test_def_freq_a)
119 dev2.connect(ap2, key_mgmt="NONE",
120 scan_freq=fst_test_common.fst_test_def_freq_g)
41a256ec
AN
121
122def disconnect_two_ap_sta_pairs(ap1, ap2, dev1, dev2):
123 dev1.disconnect()
124 dev2.disconnect()
125
126def external_sta_connect(sta, ap, **kwargs):
127 """Connects the external station to the given AP"""
128 if not isinstance(sta, WpaSupplicant):
129 raise Exception("Bad STA object")
130 if not isinstance(ap, FstAP):
131 raise Exception("Bad AP object to connect to")
132 hap = ap.get_instance()
133 sta.connect(ap.get_ssid(), **kwargs)
134
135def disconnect_external_sta(sta, ap, check_disconnect=True):
136 """Disconnects the external station from the AP"""
137 if not isinstance(sta, WpaSupplicant):
138 raise Exception("Bad STA object")
139 if not isinstance(ap, FstAP):
140 raise Exception("Bad AP object to connect to")
141 sta.request("DISCONNECT")
142 if check_disconnect:
143 hap = ap.get_instance()
144 ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10)
145 if ev is None:
146 raise Exception("No disconnection event received from %s" % ap.get_ssid())
147
148#
149# FstDevice class
150# This is the parent class for the AP (FstAP) and STA (FstSTA) that implements
151# FST functionality.
152#
153class FstDevice:
85eb89fe 154 def __init__(self, iface, fst_group, fst_pri, fst_llt=None, rsn=False):
41a256ec
AN
155 self.iface = iface
156 self.fst_group = fst_group
157 self.fst_pri = fst_pri
158 self.fst_llt = fst_llt # None llt means no llt parameter will be set
159 self.instance = None # Hostapd/WpaSupplicant instance
160 self.peer_obj = None # Peer object, must be a FstDevice child object
161 self.new_peer_addr = None # Peer MAC address for new session iface
162 self.old_peer_addr = None # Peer MAC address for old session iface
163 self.role = 'initiator' # Role: initiator/responder
164 s = self.grequest("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
165 if not s.startswith('OK'):
166 raise utils.HwsimSkip("FST not supported")
85eb89fe 167 self.rsn = rsn
41a256ec
AN
168
169 def ifname(self):
170 return self.iface
171
172 def get_instance(self):
173 """Gets the Hostapd/WpaSupplicant instance"""
174 raise Exception("Virtual get_instance() called!")
175
176 def get_own_mac_address(self):
177 """Gets the device's own MAC address"""
178 raise Exception("Virtual get_own_mac_address() called!")
179
180 def get_new_peer_addr(self):
181 return self.new_peer_addr
182
183 def get_old_peer_addr(self):
184 return self.old_peer_addr
185
186 def get_actual_peer_addr(self):
187 """Gets the peer address. A connected AP/station address is returned."""
188 raise Exception("Virtual get_actual_peer_addr() called!")
189
190 def grequest(self, req):
191 """Send request on the global control interface"""
192 raise Exception, "Virtual grequest() called!"
193
194 def wait_gevent(self, events, timeout=None):
195 """Wait for a list of events on the global interface"""
196 raise Exception("Virtual wait_gevent() called!")
197
198 def request(self, req):
199 """Issue a request to the control interface"""
200 h = self.get_instance()
201 return h.request(req)
202
203 def wait_event(self, events, timeout=None):
204 """Wait for an event from the control interface"""
205 h = self.get_instance()
206 if timeout is not None:
207 return h.wait_event(events, timeout=timeout)
208 else:
209 return h.wait_event(events)
210
211 def set_old_peer_addr(self, peer_addr=None):
212 """Sets the peer address"""
213 if peer_addr is not None:
214 self.old_peer_addr = peer_addr
215 else:
216 self.old_peer_addr = self.get_actual_peer_addr()
217
218 def set_new_peer_addr(self, peer_addr=None):
219 """Sets the peer address"""
220 if peer_addr is not None:
221 self.new_peer_addr = peer_addr
222 else:
223 self.new_peer_addr = self.get_actual_peer_addr()
224
225 def add_peer(self, obj, old_peer_addr=None, new_peer_addr=None):
226 """Add peer for FST session(s). 'obj' is a FstDevice subclass object.
227 The method must be called before add_session().
228 If peer_addr is not specified, the address of the currently connected
229 station is used."""
230 if not isinstance(obj, FstDevice):
231 raise Exception("Peer must be a FstDevice object")
232 self.peer_obj = obj
233 self.set_old_peer_addr(old_peer_addr)
234 self.set_new_peer_addr(new_peer_addr)
235
236 def get_peer(self):
237 """Returns peer object"""
238 return self.peer_obj
239
240 def set_fst_parameters(self, group_id=None, pri=None, llt=None):
241 """Change/set new FST parameters. Can be used to start FST sessions with
242 different FST parameters than defined in the configuration file."""
243 if group_id is not None:
244 self.fst_group = group_id
245 if pri is not None:
246 self.fst_pri = pri
247 if llt is not None:
248 self.fst_llt = llt
249
250 def get_local_mbies(self, ifname=None):
251 if_name = ifname if ifname is not None else self.iface
252 return self.grequest("FST-MANAGER TEST_REQUEST GET_LOCAL_MBIES " + if_name)
253
254 def add_session(self):
255 """Adds an FST session. add_peer() must be called calling this
256 function"""
257 if self.peer_obj is None:
258 raise Exception("Peer wasn't added before starting session")
259 grp = ' ' + self.fst_group if self.fst_group != '' else ''
260 sid = self.grequest("FST-MANAGER SESSION_ADD" + grp)
261 sid = sid.strip()
262 if sid.startswith("FAIL"):
263 raise Exception("Cannot add FST session with groupid ==" + grp)
264 return sid
265
266 def set_session_param(self, params):
267 request = "FST-MANAGER SESSION_SET"
268 if params is not None and params != '':
269 request = request + ' ' + params
270 return self.grequest(request)
271
d6d549c4
JM
272 def get_session_params(self, sid):
273 request = "FST-MANAGER SESSION_GET " + sid
274 res = self.grequest(request)
275 if res.startswith("FAIL"):
276 return None
277 params = {}
278 for i in res.splitlines():
279 p = i.split('=')
280 params[p[0]] = p[1]
281 return params
282
283 def iface_peers(self, ifname):
284 grp = self.fst_group if self.fst_group != '' else ''
285 res = self.grequest("FST-MANAGER IFACE_PEERS " + grp + ' ' + ifname)
286 if res.startswith("FAIL"):
287 return None
288 return res.splitlines()
289
290 def get_peer_mbies(self, ifname, peer_addr):
291 return self.grequest("FST-MANAGER GET_PEER_MBIES %s %s" % (ifname, peer_addr))
292
293 def list_ifaces(self):
294 grp = self.fst_group if self.fst_group != '' else ''
295 res = self.grequest("FST-MANAGER LIST_IFACES " + grp)
296 if res.startswith("FAIL"):
297 return None
298 ifaces = []
299 for i in res.splitlines():
300 p = i.split(':')
301 iface = {}
302 iface['name'] = p[0]
303 iface['priority'] = p[1]
304 iface['llt'] = p[2]
305 ifaces.append(iface)
306 return ifaces
307
308 def list_groups(self):
309 res = self.grequest("FST-MANAGER LIST_GROUPS")
310 if res.startswith("FAIL"):
311 return None
312 return res.splitlines()
313
41a256ec
AN
314 def configure_session(self, sid, new_iface, old_iface = None):
315 """Calls session_set for a number of parameters some of which are stored
316 in "self" while others are passed to this function explicitly. If
317 old_iface is None, current iface is used; if old_iface is an empty
318 string."""
319 oldiface = old_iface if old_iface is not None else self.iface
320 s = self.set_session_param(sid + ' old_ifname=' + oldiface)
321 if not s.startswith("OK"):
322 raise Exception("Cannot set FST session old_ifname: " + s)
323 if new_iface is not None:
324 s = self.set_session_param(sid + " new_ifname=" + new_iface)
325 if not s.startswith("OK"):
326 raise Exception("Cannot set FST session new_ifname:" + s)
327 if self.new_peer_addr is not None and self.new_peer_addr != '':
328 s = self.set_session_param(sid + " new_peer_addr=" + self.new_peer_addr)
329 if not s.startswith("OK"):
330 raise Exception("Cannot set FST session peer address:" + s + " (new)")
331 if self.old_peer_addr is not None and self.old_peer_addr != '':
332 s = self.set_session_param(sid + " old_peer_addr=" + self.old_peer_addr)
333 if not s.startswith("OK"):
334 raise Exception("Cannot set FST session peer address:" + s + " (old)")
335 if self.fst_llt is not None and self.fst_llt != '':
336 s = self.set_session_param(sid + " llt=" + self.fst_llt)
337 if not s.startswith("OK"):
338 raise Exception("Cannot set FST session llt:" + s)
339
340 def send_iface_attach_request(self, ifname, group, llt, priority):
341 request = "FST-ATTACH " + ifname + ' ' + group
342 if llt is not None:
343 request += " llt=" + llt
344 if priority is not None:
345 request += " priority=" + priority
346 res = self.grequest(request)
347 if not res.startswith("OK"):
348 raise Exception("Cannot attach FST iface: " + res)
349
350 def send_iface_detach_request(self, ifname):
351 res = self.grequest("FST-DETACH " + ifname)
352 if not res.startswith("OK"):
353 raise Exception("Cannot detach FST iface: " + res)
354
355 def send_session_setup_request(self, sid):
356 s = self.grequest("FST-MANAGER SESSION_INITIATE " + sid)
357 if not s.startswith('OK'):
358 raise Exception("Cannot send setup request: %s" % s)
359 return s
360
361 def send_session_setup_response(self, sid, response):
362 request = "FST-MANAGER SESSION_RESPOND " + sid + " " + response
363 s = self.grequest(request)
364 if not s.startswith('OK'):
365 raise Exception("Cannot send setup response: %s" % s)
366 return s
367
368 def send_test_session_setup_request(self, fsts_id,
369 additional_parameter = None):
370 request = "FST-MANAGER TEST_REQUEST SEND_SETUP_REQUEST " + fsts_id
371 if additional_parameter is not None:
372 request += " " + additional_parameter
373 s = self.grequest(request)
374 if not s.startswith('OK'):
375 raise Exception("Cannot send FST setup request: %s" % s)
376 return s
377
378 def send_test_session_setup_response(self, fsts_id,
379 response, additional_parameter = None):
380 request = "FST-MANAGER TEST_REQUEST SEND_SETUP_RESPONSE " + fsts_id + " " + response
381 if additional_parameter is not None:
382 request += " " + additional_parameter
383 s = self.grequest(request)
384 if not s.startswith('OK'):
385 raise Exception("Cannot send FST setup response: %s" % s)
386 return s
387
388 def send_test_ack_request(self, fsts_id):
389 s = self.grequest("FST-MANAGER TEST_REQUEST SEND_ACK_REQUEST " + fsts_id)
390 if not s.startswith('OK'):
391 raise Exception("Cannot send FST ack request: %s" % s)
392 return s
393
394 def send_test_ack_response(self, fsts_id):
395 s = self.grequest("FST-MANAGER TEST_REQUEST SEND_ACK_RESPONSE " + fsts_id)
396 if not s.startswith('OK'):
397 raise Exception("Cannot send FST ack response: %s" % s)
398 return s
399
400 def send_test_tear_down(self, fsts_id):
401 s = self.grequest("FST-MANAGER TEST_REQUEST SEND_TEAR_DOWN " + fsts_id)
402 if not s.startswith('OK'):
403 raise Exception("Cannot send FST tear down: %s" % s)
404 return s
405
406 def get_fsts_id_by_sid(self, sid):
407 s = self.grequest("FST-MANAGER TEST_REQUEST GET_FSTS_ID " + sid)
408 if s == ' ' or s.startswith('FAIL'):
4390030a 409 raise Exception("Cannot get fsts_id for sid == %s" % sid)
41a256ec
AN
410 return int(s)
411
412 def wait_for_iface_event(self, timeout):
413 while True:
414 ev = self.wait_gevent(["FST-EVENT-IFACE"], timeout)
415 if ev is None:
416 raise Exception("No FST-EVENT-IFACE received")
417 event = parse_fst_iface_event(ev)
418 if event is None:
419 # We can't parse so it's not our event, wait for next one
420 continue
421 return event
422
423 def wait_for_session_event(self, timeout, events_to_ignore=[],
424 events_to_count=[]):
425 while True:
426 ev = self.wait_gevent(["FST-EVENT-SESSION"], timeout)
427 if ev is None:
428 raise Exception("No FST-EVENT-SESSION received")
429 event = parse_fst_session_event(ev)
430 if event is None:
431 # We can't parse so it's not our event, wait for next one
432 continue
433 if len(events_to_ignore) > 0:
434 if event['type'] in events_to_ignore:
435 continue
436 elif len(events_to_count) > 0:
437 if not event['type'] in events_to_count:
438 continue
439 return event
440
441 def initiate_session(self, sid, response="accept"):
442 """Initiates FST session with given session id 'sid'.
443 'response' is the session respond answer: "accept", "reject", or a
444 special "timeout" value to skip the response in order to test session
445 timeouts.
446 Returns: "OK" - session has been initiated, otherwise the reason for the
447 reset: REASON_REJECT, REASON_STT."""
448 strsid = ' ' + sid if sid != '' else ''
449 s = self.grequest("FST-MANAGER SESSION_INITIATE"+ strsid)
450 if not s.startswith('OK'):
451 raise Exception("Cannot initiate fst session: %s" % s)
452 ev = self.peer_obj.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5)
453 if ev is None:
454 raise Exception("No FST-EVENT-SESSION received")
455 # We got FST event
456 event = parse_fst_session_event(ev)
457 if event == None:
458 raise Exception("Unrecognized FST event: " % ev)
459 if event['type'] != 'EVENT_FST_SETUP':
460 raise Exception("Expected FST_SETUP event, got: " + event['type'])
461 ev = self.peer_obj.wait_gevent(["FST-EVENT-SESSION"], timeout=5)
462 if ev is None:
463 raise Exception("No FST-EVENT-SESSION received")
464 event = parse_fst_session_event(ev)
465 if event == None:
466 raise Exception("Unrecognized FST event: " % ev)
467 if event['type'] != 'EVENT_FST_SESSION_STATE':
468 raise Exception("Expected EVENT_FST_SESSION_STATE event, got: " + event['type'])
469 if event['new_state'] != "SETUP_COMPLETION":
470 raise Exception("Expected new state SETUP_COMPLETION, got: " + event['new_state'])
471 if response == '':
472 return 'OK'
473 if response != "timeout":
474 s = self.peer_obj.grequest("FST-MANAGER SESSION_RESPOND "+ event['id'] + " " + response) # Or reject
475 if not s.startswith('OK'):
476 raise Exception("Error session_respond: %s" % s)
477 # Wait for EVENT_FST_SESSION_STATE events. We should get at least 2
478 # events. The 1st event will be EVENT_FST_SESSION_STATE
479 # old_state=INITIAL new_state=SETUP_COMPLETED. The 2nd event will be
480 # either EVENT_FST_ESTABLISHED with the session id or
481 # EVENT_FST_SESSION_STATE with new_state=INITIAL if the session was
482 # reset, the reason field will tell why.
483 result = ''
484 while result == '':
485 ev = self.wait_gevent(["FST-EVENT-SESSION"], timeout=5)
486 if ev is None:
487 break # No session event received
488 event = parse_fst_session_event(ev)
489 if event == None:
490 # We can't parse so it's not our event, wait for next one
491 continue
492 if event['type'] == 'EVENT_FST_ESTABLISHED':
493 result = "OK"
494 break
495 elif event['type'] == "EVENT_FST_SESSION_STATE":
496 if event['new_state'] == "INITIAL":
497 # Session was reset, the only reason to get back to initial
498 # state.
499 result = event['reason']
500 break
501 if result == '':
502 raise Exception("No event for session respond")
503 return result
504
505 def transfer_session(self, sid):
506 """Transfers the session. 'sid' is the session id. 'hsta' is the
507 station-responder object.
508 Returns: REASON_SWITCH - the session has been transferred successfully
509 or a REASON_... reported by the reset event."""
510 request = "FST-MANAGER SESSION_TRANSFER"
511 if sid != '':
512 request += ' ' + sid
513 s = self.grequest(request)
514 if not s.startswith('OK'):
515 raise Exception("Cannot transfer fst session: %s" % s)
516 result = ''
517 while result == '':
518 ev = self.peer_obj.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5)
519 if ev is None:
520 raise Exception("Missing session transfer event")
521 # We got FST event. We expect TRANSITION_CONFIRMED state and then
522 # INITIAL (reset) with the reason (e.g. "REASON_SWITCH").
523 # Right now we'll be waiting for the reset event and record the
524 # reason.
525 event = parse_fst_session_event(ev)
526 if event == None:
527 raise Exception("Unrecognized FST event: " % ev)
528 if event['new_state'] == 'INITIAL':
529 result = event['reason']
530 return result
531
532 def wait_for_tear_down(self):
533 ev = self.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5)
534 if ev is None:
535 raise Exception("No FST-EVENT-SESSION received")
536 # We got FST event
537 event = parse_fst_session_event(ev)
538 if event == None:
539 raise Exception("Unrecognized FST event: " % ev)
540 if event['type'] != 'EVENT_FST_SESSION_STATE':
541 raise Exception("Expected EVENT_FST_SESSION_STATE event, got: " + event['type'])
542 if event['new_state'] != "INITIAL":
543 raise Exception("Expected new state INITIAL, got: " + event['new_state'])
544 if event['reason'] != 'REASON_TEARDOWN':
545 raise Exception("Expected reason REASON_TEARDOWN, got: " + event['reason'])
546
547 def teardown_session(self, sid):
548 """Tears down FST session with a given session id ('sid')"""
549 strsid = ' ' + sid if sid != '' else ''
550 s = self.grequest("FST-MANAGER SESSION_TEARDOWN" + strsid)
551 if not s.startswith('OK'):
552 raise Exception("Cannot tear down fst session: %s" % s)
553 self.peer_obj.wait_for_tear_down()
554
555
556 def remove_session(self, sid, wait_for_tear_down=True):
557 """Removes FST session with a given session id ('sid')"""
558 strsid = ' ' + sid if sid != '' else ''
559 s = self.grequest("FST-MANAGER SESSION_REMOVE" + strsid)
560 if not s.startswith('OK'):
561 raise Exception("Cannot remove fst session: %s" % s)
562 if wait_for_tear_down == True:
563 self.peer_obj.wait_for_tear_down()
564
565 def remove_all_sessions(self):
566 """Removes FST session with a given session id ('sid')"""
567 grp = ' ' + self.fst_group if self.fst_group != '' else ''
568 s = self.grequest("FST-MANAGER LIST_SESSIONS" + grp)
569 if not s.startswith('FAIL'):
570 for sid in s.splitlines():
571 sid = sid.strip()
572 if len(sid) != 0:
573 self.remove_session(sid, wait_for_tear_down=False)
574
575
576#
577# FstAP class
578#
579class FstAP (FstDevice):
580 def __init__(self, iface, ssid, mode, chan, fst_group, fst_pri,
85eb89fe 581 fst_llt=None, rsn=False):
41a256ec
AN
582 """If fst_group is empty, then FST parameters will not be set
583 If fst_llt is empty, the parameter will not be set and the default value
584 is expected to be configured."""
585 self.ssid = ssid
586 self.mode = mode
587 self.chan = chan
588 self.reg_ctrl = fst_test_common.HapdRegCtrl()
589 self.reg_ctrl.add_ap(iface, self.chan)
590 self.global_instance = hostapd.HostapdGlobal()
85eb89fe 591 FstDevice.__init__(self, iface, fst_group, fst_pri, fst_llt, rsn)
41a256ec 592
d99ed42a 593 def start(self, return_early=False):
41a256ec
AN
594 """Starts AP the "standard" way as it was intended by hostapd tests.
595 This will work only when FST supports fully dynamically loading
596 parameters in hostapd."""
597 params = {}
598 params['ssid'] = self.ssid
599 params['hw_mode'] = self.mode
600 params['channel'] = self.chan
601 params['country_code'] = 'US'
85eb89fe
JM
602 if self.rsn:
603 params['wpa'] = '2'
604 params['wpa_key_mgmt'] = 'WPA-PSK'
605 params['rsn_pairwise'] = 'CCMP'
606 params['wpa_passphrase'] = '12345678'
41a256ec
AN
607 self.hapd=hostapd.add_ap(self.iface, params)
608 if not self.hapd.ping():
609 raise Exception("Could not ping FST hostapd")
610 self.reg_ctrl.start()
611 self.get_global_instance()
d99ed42a
JM
612 if return_early:
613 return self.hapd
41a256ec
AN
614 if len(self.fst_group) != 0:
615 self.send_iface_attach_request(self.iface, self.fst_group,
616 self.fst_llt, self.fst_pri)
617 return self.hapd
618
619 def stop(self):
620 """Removes the AP, To be used when dynamic fst APs are implemented in
621 hostapd."""
622 if len(self.fst_group) != 0:
623 self.remove_all_sessions()
624 self.send_iface_detach_request(self.iface)
625 self.reg_ctrl.stop()
626 del self.global_instance
627 self.global_instance = None
628
629 def get_instance(self):
630 """Return the Hostapd/WpaSupplicant instance"""
631 if self.instance is None:
632 self.instance = hostapd.Hostapd(self.iface)
633 return self.instance
634
635 def get_global_instance(self):
636 return self.global_instance
637
638 def get_own_mac_address(self):
639 """Gets the device's own MAC address"""
640 h = self.get_instance()
641 status = h.get_status()
642 return status['bssid[0]']
643
644 def get_actual_peer_addr(self):
645 """Gets the peer address. A connected station address is returned."""
646 # Use the device instance, the global control interface doesn't have
647 # station address
648 h = self.get_instance()
649 sta = h.get_sta(None)
650 if sta is None or 'addr' not in sta:
651 # Maybe station is not connected?
652 addr = None
653 else:
654 addr=sta['addr']
655 return addr
656
657 def grequest(self, req):
658 """Send request on the global control interface"""
659 logger.debug("FstAP::grequest: " + req)
660 h = self.get_global_instance()
661 return h.request(req)
662
663 def wait_gevent(self, events, timeout=None):
664 """Wait for a list of events on the global interface"""
665 h = self.get_global_instance()
666 if timeout is not None:
667 return h.wait_event(events, timeout=timeout)
668 else:
669 return h.wait_event(events)
670
671 def get_ssid(self):
672 return self.ssid
673
674#
675# FstSTA class
676#
677class FstSTA (FstDevice):
85eb89fe 678 def __init__(self, iface, fst_group, fst_pri, fst_llt=None, rsn=False):
41a256ec
AN
679 """If fst_group is empty, then FST parameters will not be set
680 If fst_llt is empty, the parameter will not be set and the default value
681 is expected to be configured."""
85eb89fe 682 FstDevice.__init__(self, iface, fst_group, fst_pri, fst_llt, rsn)
41a256ec
AN
683 self.connected = None # FstAP object the station is connected to
684
685 def start(self):
686 """Current implementation involves running another instance of
687 wpa_supplicant with fixed FST STAs configurations. When any type of
688 dynamic STA loading is implemented, rewrite the function similarly to
689 FstAP."""
690 h = self.get_instance()
691 h.interface_add(self.iface, drv_params="force_connect_cmd=1")
692 if not h.global_ping():
693 raise Exception("Could not ping FST wpa_supplicant")
694 if len(self.fst_group) != 0:
695 self.send_iface_attach_request(self.iface, self.fst_group,
696 self.fst_llt, self.fst_pri)
697 return None
698
699 def stop(self):
700 """Removes the STA. In a static (temporary) implementation does nothing,
701 the STA will be removed when the fst wpa_supplicant process is killed by
702 fstap.cleanup()."""
703 h = self.get_instance()
704 if len(self.fst_group) != 0:
705 self.remove_all_sessions()
706 self.send_iface_detach_request(self.iface)
707 h.interface_remove(self.iface)
708 h.close_ctrl()
709 del h
710 self.instance = None
711
712 def get_instance(self):
713 """Return the Hostapd/WpaSupplicant instance"""
714 if self.instance is None:
715 self.instance = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
716 return self.instance
717
718 def get_own_mac_address(self):
719 """Gets the device's own MAC address"""
720 h = self.get_instance()
721 status = h.get_status()
722 return status['address']
723
724 def get_actual_peer_addr(self):
725 """Gets the peer address. A connected station address is returned"""
726 h = self.get_instance()
727 status = h.get_status()
728 return status['bssid']
729
730 def grequest(self, req):
731 """Send request on the global control interface"""
732 logger.debug("FstSTA::grequest: " + req)
733 h = self.get_instance()
734 return h.global_request(req)
735
736 def wait_gevent(self, events, timeout=None):
737 """Wait for a list of events on the global interface"""
738 h = self.get_instance()
739 if timeout is not None:
740 return h.wait_global_event(events, timeout=timeout)
741 else:
742 return h.wait_global_event(events)
743
744 def scan(self, freq=None, no_wait=False, only_new=False):
745 """Issue Scan with given parameters. Returns the BSS dictionary for the
746 AP found (the 1st BSS found. TODO: What if the AP required is not the
747 1st in list?) or None if no BSS found. None call be also a result of
748 no_wait=True. Note, request("SCAN_RESULTS") can be used to get all the
749 results at once."""
750 h = self.get_instance()
751 h.scan(None, freq, no_wait, only_new)
752 r = h.get_bss('0')
753 return r
754
755 def connect(self, ap, **kwargs):
756 """Connects to the given AP"""
757 if not isinstance(ap, FstAP):
758 raise Exception("Bad AP object to connect to")
759 h = self.get_instance()
760 hap = ap.get_instance()
761 h.connect(ap.get_ssid(), **kwargs)
762 self.connected = ap
763
764 def connect_to_external_ap(self, ap, ssid, check_connection=True, **kwargs):
765 """Connects to the given external AP"""
766 if not isinstance(ap, hostapd.Hostapd):
767 raise Exception("Bad AP object to connect to")
768 h = self.get_instance()
769 h.connect(ssid, **kwargs)
770 self.connected = ap
771 if check_connection:
772 ev = ap.wait_event([ "AP-STA-CONNECTED" ], timeout=10)
773 if ev is None:
774 self.connected = None
775 raise Exception("No connection event received from %s" % ssid)
776
777 def disconnect(self, check_disconnect=True):
778 """Disconnects from the AP the station is currently connected to"""
779 if self.connected is not None:
780 h = self.get_instance()
781 h.request("DISCONNECT")
782 if check_disconnect:
783 hap = self.connected.get_instance()
784 ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10)
785 if ev is None:
786 raise Exception("No disconnection event received from %s" % self.connected.get_ssid())
787 self.connected = None
788
789
790 def disconnect_from_external_ap(self, check_disconnect=True):
791 """Disconnects from the external AP the station is currently connected
792 to"""
793 if self.connected is not None:
794 h = self.get_instance()
795 h.request("DISCONNECT")
796 if check_disconnect:
797 hap = self.connected
798 ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10)
799 if ev is None:
800 raise Exception("No disconnection event received from AP")
801 self.connected = None