]>
git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/fst_module_aux.py
fa6078871db76268395f08be5b87d5baa8238f44
1 # FST tests related classes
2 # Copyright (c) 2015, Qualcomm Atheros, Inc.
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
16 from wpasupplicant
import WpaSupplicant
18 import fst_test_common
20 logger
= logging
.getLogger()
22 def parse_fst_iface_event(ev
):
23 """Parses FST iface event that comes as a string, e.g.
24 "<3>FST-EVENT-IFACE attached ifname=wlan9 group=fstg0"
25 Returns a dictionary with parsed "event_type", "ifname", and "group"; or
26 None if not an FST event or can't be parsed."""
28 if ev
.find("FST-EVENT-IFACE") == -1:
30 if ev
.find("attached") != -1:
31 event
['event_type'] = 'attached'
32 elif ev
.find("detached") != -1:
33 event
['event_type'] = 'detached'
36 f
= re
.search("ifname=(\S+)", ev
)
38 event
['ifname'] = f
.group(1)
39 f
= re
.search("group=(\S+)", ev
)
41 event
['group'] = f
.group(1)
44 def parse_fst_session_event(ev
):
45 """Parses FST session event that comes as a string, e.g.
46 "<3>FST-EVENT-SESSION event_type=EVENT_FST_SESSION_STATE session_id=0 reason=REASON_STT"
47 Returns a dictionary with parsed "type", "id", and "reason"; or None if not
48 a FST event or can't be parsed"""
50 if ev
.find("FST-EVENT-SESSION") == -1:
52 event
['new_state'] = '' # The field always exists in the dictionary
53 f
= re
.search("event_type=(\S+)", ev
)
56 event
['type'] = f
.group(1)
57 f
= re
.search("session_id=(\d+)", ev
)
59 event
['id'] = f
.group(1)
60 f
= re
.search("old_state=(\S+)", ev
)
62 event
['old_state'] = f
.group(1)
63 f
= re
.search("new_state=(\S+)", ev
)
65 event
['new_state'] = f
.group(1)
66 f
= re
.search("reason=(\S+)", ev
)
68 event
['reason'] = f
.group(1)
71 def start_two_ap_sta_pairs(apdev
, rsn
=False):
72 """auxiliary function that creates two pairs of APs and STAs"""
73 ap1
= FstAP(apdev
[0]['ifname'], 'fst_11a', 'a',
74 fst_test_common
.fst_test_def_chan_a
,
75 fst_test_common
.fst_test_def_group
,
76 fst_test_common
.fst_test_def_prio_low
,
77 fst_test_common
.fst_test_def_llt
, rsn
=rsn
)
79 ap2
= FstAP(apdev
[1]['ifname'], 'fst_11g', 'g',
80 fst_test_common
.fst_test_def_chan_g
,
81 fst_test_common
.fst_test_def_group
,
82 fst_test_common
.fst_test_def_prio_high
,
83 fst_test_common
.fst_test_def_llt
, rsn
=rsn
)
86 sta1
= FstSTA('wlan5',
87 fst_test_common
.fst_test_def_group
,
88 fst_test_common
.fst_test_def_prio_low
,
89 fst_test_common
.fst_test_def_llt
, rsn
=rsn
)
91 sta2
= FstSTA('wlan6',
92 fst_test_common
.fst_test_def_group
,
93 fst_test_common
.fst_test_def_prio_high
,
94 fst_test_common
.fst_test_def_llt
, rsn
=rsn
)
97 return ap1
, ap2
, sta1
, sta2
99 def stop_two_ap_sta_pairs(ap1
, ap2
, sta1
, sta2
):
105 def connect_two_ap_sta_pairs(ap1
, ap2
, dev1
, dev2
, rsn
=False):
106 """Connects a pair of stations, each one to a separate AP"""
107 dev1
.scan(freq
=fst_test_common
.fst_test_def_freq_a
)
108 dev2
.scan(freq
=fst_test_common
.fst_test_def_freq_g
)
111 dev1
.connect(ap1
, psk
="12345678",
112 scan_freq
=fst_test_common
.fst_test_def_freq_a
)
113 dev2
.connect(ap2
, psk
="12345678",
114 scan_freq
=fst_test_common
.fst_test_def_freq_g
)
116 dev1
.connect(ap1
, key_mgmt
="NONE",
117 scan_freq
=fst_test_common
.fst_test_def_freq_a
)
118 dev2
.connect(ap2
, key_mgmt
="NONE",
119 scan_freq
=fst_test_common
.fst_test_def_freq_g
)
121 def disconnect_two_ap_sta_pairs(ap1
, ap2
, dev1
, dev2
):
125 def external_sta_connect(sta
, ap
, **kwargs
):
126 """Connects the external station to the given AP"""
127 if not isinstance(sta
, WpaSupplicant
):
128 raise Exception("Bad STA object")
129 if not isinstance(ap
, FstAP
):
130 raise Exception("Bad AP object to connect to")
131 hap
= ap
.get_instance()
132 sta
.connect(ap
.get_ssid(), **kwargs
)
134 def disconnect_external_sta(sta
, ap
, check_disconnect
=True):
135 """Disconnects the external station from the AP"""
136 if not isinstance(sta
, WpaSupplicant
):
137 raise Exception("Bad STA object")
138 if not isinstance(ap
, FstAP
):
139 raise Exception("Bad AP object to connect to")
140 sta
.request("DISCONNECT")
142 hap
= ap
.get_instance()
143 ev
= hap
.wait_event([ "AP-STA-DISCONNECTED" ], timeout
=10)
145 raise Exception("No disconnection event received from %s" % ap
.get_ssid())
149 # This is the parent class for the AP (FstAP) and STA (FstSTA) that implements
153 def __init__(self
, iface
, fst_group
, fst_pri
, fst_llt
=None, rsn
=False):
155 self
.fst_group
= fst_group
156 self
.fst_pri
= fst_pri
157 self
.fst_llt
= fst_llt
# None llt means no llt parameter will be set
158 self
.instance
= None # Hostapd/WpaSupplicant instance
159 self
.peer_obj
= None # Peer object, must be a FstDevice child object
160 self
.new_peer_addr
= None # Peer MAC address for new session iface
161 self
.old_peer_addr
= None # Peer MAC address for old session iface
162 self
.role
= 'initiator' # Role: initiator/responder
163 s
= self
.grequest("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
164 if not s
.startswith('OK'):
165 raise utils
.HwsimSkip("FST not supported")
171 def get_instance(self
):
172 """Gets the Hostapd/WpaSupplicant instance"""
173 raise Exception("Virtual get_instance() called!")
175 def get_own_mac_address(self
):
176 """Gets the device's own MAC address"""
177 raise Exception("Virtual get_own_mac_address() called!")
179 def get_new_peer_addr(self
):
180 return self
.new_peer_addr
182 def get_old_peer_addr(self
):
183 return self
.old_peer_addr
185 def get_actual_peer_addr(self
):
186 """Gets the peer address. A connected AP/station address is returned."""
187 raise Exception("Virtual get_actual_peer_addr() called!")
189 def grequest(self
, req
):
190 """Send request on the global control interface"""
191 raise Exception, "Virtual grequest() called!"
193 def wait_gevent(self
, events
, timeout
=None):
194 """Wait for a list of events on the global interface"""
195 raise Exception("Virtual wait_gevent() called!")
197 def request(self
, req
):
198 """Issue a request to the control interface"""
199 h
= self
.get_instance()
200 return h
.request(req
)
202 def wait_event(self
, events
, timeout
=None):
203 """Wait for an event from the control interface"""
204 h
= self
.get_instance()
205 if timeout
is not None:
206 return h
.wait_event(events
, timeout
=timeout
)
208 return h
.wait_event(events
)
210 def set_old_peer_addr(self
, peer_addr
=None):
211 """Sets the peer address"""
212 if peer_addr
is not None:
213 self
.old_peer_addr
= peer_addr
215 self
.old_peer_addr
= self
.get_actual_peer_addr()
217 def set_new_peer_addr(self
, peer_addr
=None):
218 """Sets the peer address"""
219 if peer_addr
is not None:
220 self
.new_peer_addr
= peer_addr
222 self
.new_peer_addr
= self
.get_actual_peer_addr()
224 def add_peer(self
, obj
, old_peer_addr
=None, new_peer_addr
=None):
225 """Add peer for FST session(s). 'obj' is a FstDevice subclass object.
226 The method must be called before add_session().
227 If peer_addr is not specified, the address of the currently connected
229 if not isinstance(obj
, FstDevice
):
230 raise Exception("Peer must be a FstDevice object")
232 self
.set_old_peer_addr(old_peer_addr
)
233 self
.set_new_peer_addr(new_peer_addr
)
236 """Returns peer object"""
239 def set_fst_parameters(self
, group_id
=None, pri
=None, llt
=None):
240 """Change/set new FST parameters. Can be used to start FST sessions with
241 different FST parameters than defined in the configuration file."""
242 if group_id
is not None:
243 self
.fst_group
= group_id
249 def get_local_mbies(self
, ifname
=None):
250 if_name
= ifname
if ifname
is not None else self
.iface
251 return self
.grequest("FST-MANAGER TEST_REQUEST GET_LOCAL_MBIES " + if_name
)
253 def add_session(self
):
254 """Adds an FST session. add_peer() must be called calling this
256 if self
.peer_obj
is None:
257 raise Exception("Peer wasn't added before starting session")
259 grp
= ' ' + self
.fst_group
if self
.fst_group
!= '' else ''
260 sid
= self
.grequest("FST-MANAGER SESSION_ADD" + grp
)
262 if sid
.startswith("FAIL"):
263 raise Exception("Cannot add FST session with groupid ==" + grp
)
267 def set_session_param(self
, params
):
268 request
= "FST-MANAGER SESSION_SET"
269 if params
is not None and params
!= '':
270 request
= request
+ ' ' + params
271 return self
.grequest(request
)
273 def get_session_params(self
, sid
):
274 request
= "FST-MANAGER SESSION_GET " + sid
275 res
= self
.grequest(request
)
276 if res
.startswith("FAIL"):
279 for i
in res
.splitlines():
284 def iface_peers(self
, ifname
):
285 grp
= self
.fst_group
if self
.fst_group
!= '' else ''
286 res
= self
.grequest("FST-MANAGER IFACE_PEERS " + grp
+ ' ' + ifname
)
287 if res
.startswith("FAIL"):
289 return res
.splitlines()
291 def get_peer_mbies(self
, ifname
, peer_addr
):
292 return self
.grequest("FST-MANAGER GET_PEER_MBIES %s %s" % (ifname
, peer_addr
))
294 def list_ifaces(self
):
295 grp
= self
.fst_group
if self
.fst_group
!= '' else ''
296 res
= self
.grequest("FST-MANAGER LIST_IFACES " + grp
)
297 if res
.startswith("FAIL"):
300 for i
in res
.splitlines():
304 iface
['priority'] = p
[1]
309 def list_groups(self
):
310 res
= self
.grequest("FST-MANAGER LIST_GROUPS")
311 if res
.startswith("FAIL"):
313 return res
.splitlines()
315 def configure_session(self
, sid
, new_iface
, old_iface
= None):
316 """Calls session_set for a number of parameters some of which are stored
317 in "self" while others are passed to this function explicitly. If
318 old_iface is None, current iface is used; if old_iface is an empty
321 oldiface
= old_iface
if old_iface
is not None else self
.iface
322 s
= self
.set_session_param(sid
+ ' old_ifname=' + oldiface
)
323 if not s
.startswith("OK"):
324 raise Exception("Cannot set FST session old_ifname: " + s
)
325 if new_iface
is not None:
326 s
= self
.set_session_param(sid
+ " new_ifname=" + new_iface
)
327 if not s
.startswith("OK"):
328 raise Exception("Cannot set FST session new_ifname:" + s
)
329 if self
.new_peer_addr
is not None and self
.new_peer_addr
!= '':
330 s
= self
.set_session_param(sid
+ " new_peer_addr=" + self
.new_peer_addr
)
331 if not s
.startswith("OK"):
332 raise Exception("Cannot set FST session peer address:" + s
+ " (new)")
333 if self
.old_peer_addr
is not None and self
.old_peer_addr
!= '':
334 s
= self
.set_session_param(sid
+ " old_peer_addr=" + self
.old_peer_addr
)
335 if not s
.startswith("OK"):
336 raise Exception("Cannot set FST session peer address:" + s
+ " (old)")
337 if self
.fst_llt
is not None and self
.fst_llt
!= '':
338 s
= self
.set_session_param(sid
+ " llt=" + self
.fst_llt
)
339 if not s
.startswith("OK"):
340 raise Exception("Cannot set FST session llt:" + s
)
343 def send_iface_attach_request(self
, ifname
, group
, llt
, priority
):
344 request
= "FST-ATTACH " + ifname
+ ' ' + group
346 request
+= " llt=" + llt
347 if priority
is not None:
348 request
+= " priority=" + priority
349 res
= self
.grequest(request
)
350 if not res
.startswith("OK"):
351 raise Exception("Cannot attach FST iface: " + res
)
353 def send_iface_detach_request(self
, ifname
):
354 res
= self
.grequest("FST-DETACH " + ifname
)
355 if not res
.startswith("OK"):
356 raise Exception("Cannot detach FST iface: " + res
)
358 def send_session_setup_request(self
, sid
):
359 s
= self
.grequest("FST-MANAGER SESSION_INITIATE " + sid
)
360 if not s
.startswith('OK'):
361 raise Exception("Cannot send setup request: %s" % s
)
364 def send_session_setup_response(self
, sid
, response
):
365 request
= "FST-MANAGER SESSION_RESPOND " + sid
+ " " + response
366 s
= self
.grequest(request
)
367 if not s
.startswith('OK'):
368 raise Exception("Cannot send setup response: %s" % s
)
371 def send_test_session_setup_request(self
, fsts_id
,
372 additional_parameter
= None):
373 request
= "FST-MANAGER TEST_REQUEST SEND_SETUP_REQUEST " + fsts_id
374 if additional_parameter
is not None:
375 request
+= " " + additional_parameter
376 s
= self
.grequest(request
)
377 if not s
.startswith('OK'):
378 raise Exception("Cannot send FST setup request: %s" % s
)
381 def send_test_session_setup_response(self
, fsts_id
,
382 response
, additional_parameter
= None):
383 request
= "FST-MANAGER TEST_REQUEST SEND_SETUP_RESPONSE " + fsts_id
+ " " + response
384 if additional_parameter
is not None:
385 request
+= " " + additional_parameter
386 s
= self
.grequest(request
)
387 if not s
.startswith('OK'):
388 raise Exception("Cannot send FST setup response: %s" % s
)
391 def send_test_ack_request(self
, fsts_id
):
392 s
= self
.grequest("FST-MANAGER TEST_REQUEST SEND_ACK_REQUEST " + fsts_id
)
393 if not s
.startswith('OK'):
394 raise Exception("Cannot send FST ack request: %s" % s
)
397 def send_test_ack_response(self
, fsts_id
):
398 s
= self
.grequest("FST-MANAGER TEST_REQUEST SEND_ACK_RESPONSE " + fsts_id
)
399 if not s
.startswith('OK'):
400 raise Exception("Cannot send FST ack response: %s" % s
)
403 def send_test_tear_down(self
, fsts_id
):
404 s
= self
.grequest("FST-MANAGER TEST_REQUEST SEND_TEAR_DOWN " + fsts_id
)
405 if not s
.startswith('OK'):
406 raise Exception("Cannot send FST tear down: %s" % s
)
409 def get_fsts_id_by_sid(self
, sid
):
410 s
= self
.grequest("FST-MANAGER TEST_REQUEST GET_FSTS_ID " + sid
)
411 if s
== ' ' or s
.startswith('FAIL'):
412 raise Exception("Cannot get fsts_id for sid == %s" % sid
)
415 def wait_for_iface_event(self
, timeout
):
417 ev
= self
.wait_gevent(["FST-EVENT-IFACE"], timeout
)
419 raise Exception("No FST-EVENT-IFACE received")
420 event
= parse_fst_iface_event(ev
)
422 # We can't parse so it's not our event, wait for next one
426 def wait_for_session_event(self
, timeout
, events_to_ignore
=[],
429 ev
= self
.wait_gevent(["FST-EVENT-SESSION"], timeout
)
431 raise Exception("No FST-EVENT-SESSION received")
432 event
= parse_fst_session_event(ev
)
434 # We can't parse so it's not our event, wait for next one
436 if len(events_to_ignore
) > 0:
437 if event
['type'] in events_to_ignore
:
439 elif len(events_to_count
) > 0:
440 if not event
['type'] in events_to_count
:
444 def initiate_session(self
, sid
, response
="accept"):
445 """Initiates FST session with given session id 'sid'.
446 'response' is the session respond answer: "accept", "reject", or a
447 special "timeout" value to skip the response in order to test session
449 Returns: "OK" - session has been initiated, otherwise the reason for the
450 reset: REASON_REJECT, REASON_STT."""
451 strsid
= ' ' + sid
if sid
!= '' else ''
452 s
= self
.grequest("FST-MANAGER SESSION_INITIATE"+ strsid
)
453 if not s
.startswith('OK'):
454 raise Exception("Cannot initiate fst session: %s" % s
)
455 ev
= self
.peer_obj
.wait_gevent([ "FST-EVENT-SESSION" ], timeout
=5)
457 raise Exception("No FST-EVENT-SESSION received")
459 event
= parse_fst_session_event(ev
)
461 raise Exception("Unrecognized FST event: " % ev
)
462 if event
['type'] != 'EVENT_FST_SETUP':
463 raise Exception("Expected FST_SETUP event, got: " + event
['type'])
464 ev
= self
.peer_obj
.wait_gevent(["FST-EVENT-SESSION"], timeout
=5)
466 raise Exception("No FST-EVENT-SESSION received")
467 event
= parse_fst_session_event(ev
)
469 raise Exception("Unrecognized FST event: " % ev
)
470 if event
['type'] != 'EVENT_FST_SESSION_STATE':
471 raise Exception("Expected EVENT_FST_SESSION_STATE event, got: " + event
['type'])
472 if event
['new_state'] != "SETUP_COMPLETION":
473 raise Exception("Expected new state SETUP_COMPLETION, got: " + event
['new_state'])
476 if response
!= "timeout":
477 s
= self
.peer_obj
.grequest("FST-MANAGER SESSION_RESPOND "+ event
['id'] + " " + response
) # Or reject
478 if not s
.startswith('OK'):
479 raise Exception("Error session_respond: %s" % s
)
480 # Wait for EVENT_FST_SESSION_STATE events. We should get at least 2
481 # events. The 1st event will be EVENT_FST_SESSION_STATE
482 # old_state=INITIAL new_state=SETUP_COMPLETED. The 2nd event will be
483 # either EVENT_FST_ESTABLISHED with the session id or
484 # EVENT_FST_SESSION_STATE with new_state=INITIAL if the session was
485 # reset, the reason field will tell why.
488 ev
= self
.wait_gevent(["FST-EVENT-SESSION"], timeout
=5)
490 break # No session event received
491 event
= parse_fst_session_event(ev
)
493 # We can't parse so it's not our event, wait for next one
495 if event
['type'] == 'EVENT_FST_ESTABLISHED':
498 elif event
['type'] == "EVENT_FST_SESSION_STATE":
499 if event
['new_state'] == "INITIAL":
500 # Session was reset, the only reason to get back to initial
502 result
= event
['reason']
505 raise Exception("No event for session respond")
508 def transfer_session(self
, sid
):
509 """Transfers the session. 'sid' is the session id. 'hsta' is the
510 station-responder object.
511 Returns: REASON_SWITCH - the session has been transferred successfully
512 or a REASON_... reported by the reset event."""
513 request
= "FST-MANAGER SESSION_TRANSFER"
517 s
= self
.grequest(request
)
518 if not s
.startswith('OK'):
519 raise Exception("Cannot transfer fst session: %s" % s
)
522 ev
= self
.peer_obj
.wait_gevent([ "FST-EVENT-SESSION" ], timeout
=5)
524 raise Exception("Missing session transfer event")
525 # We got FST event. We expect TRANSITION_CONFIRMED state and then
526 # INITIAL (reset) with the reason (e.g. "REASON_SWITCH").
527 # Right now we'll be waiting for the reset event and record the
529 event
= parse_fst_session_event(ev
)
531 raise Exception("Unrecognized FST event: " % ev
)
532 if event
['new_state'] == 'INITIAL':
533 result
= event
['reason']
537 def wait_for_tear_down(self
):
538 ev
= self
.wait_gevent([ "FST-EVENT-SESSION" ], timeout
=5)
540 raise Exception("No FST-EVENT-SESSION received")
542 event
= parse_fst_session_event(ev
)
544 raise Exception("Unrecognized FST event: " % ev
)
545 if event
['type'] != 'EVENT_FST_SESSION_STATE':
546 raise Exception("Expected EVENT_FST_SESSION_STATE event, got: " + event
['type'])
547 if event
['new_state'] != "INITIAL":
548 raise Exception("Expected new state INITIAL, got: " + event
['new_state'])
549 if event
['reason'] != 'REASON_TEARDOWN':
550 raise Exception("Expected reason REASON_TEARDOWN, got: " + event
['reason'])
552 def teardown_session(self
, sid
):
553 """Tears down FST session with a given session id ('sid')"""
554 strsid
= ' ' + sid
if sid
!= '' else ''
555 s
= self
.grequest("FST-MANAGER SESSION_TEARDOWN" + strsid
)
556 if not s
.startswith('OK'):
557 raise Exception("Cannot tear down fst session: %s" % s
)
558 self
.peer_obj
.wait_for_tear_down()
561 def remove_session(self
, sid
, wait_for_tear_down
=True):
562 """Removes FST session with a given session id ('sid')"""
563 strsid
= ' ' + sid
if sid
!= '' else ''
564 s
= self
.grequest("FST-MANAGER SESSION_REMOVE" + strsid
)
565 if not s
.startswith('OK'):
566 raise Exception("Cannot remove fst session: %s" % s
)
567 if wait_for_tear_down
== True:
568 self
.peer_obj
.wait_for_tear_down()
570 def remove_all_sessions(self
):
571 """Removes FST session with a given session id ('sid')"""
572 grp
= ' ' + self
.fst_group
if self
.fst_group
!= '' else ''
573 s
= self
.grequest("FST-MANAGER LIST_SESSIONS" + grp
)
574 if not s
.startswith('FAIL'):
575 for sid
in s
.splitlines():
578 self
.remove_session(sid
, wait_for_tear_down
=False)
584 class FstAP (FstDevice
):
585 def __init__(self
, iface
, ssid
, mode
, chan
, fst_group
, fst_pri
,
586 fst_llt
=None, rsn
=False):
587 """If fst_group is empty, then FST parameters will not be set
588 If fst_llt is empty, the parameter will not be set and the default value
589 is expected to be configured."""
593 self
.reg_ctrl
= fst_test_common
.HapdRegCtrl()
594 self
.reg_ctrl
.add_ap(iface
, self
.chan
)
595 self
.global_instance
= hostapd
.HostapdGlobal()
596 FstDevice
.__init
__(self
, iface
, fst_group
, fst_pri
, fst_llt
, rsn
)
598 def start(self
, return_early
=False):
599 """Starts AP the "standard" way as it was intended by hostapd tests.
600 This will work only when FST supports fully dynamically loading
601 parameters in hostapd."""
603 params
['ssid'] = self
.ssid
604 params
['hw_mode'] = self
.mode
605 params
['channel'] = self
.chan
606 params
['country_code'] = 'US'
609 params
['wpa_key_mgmt'] = 'WPA-PSK'
610 params
['rsn_pairwise'] = 'CCMP'
611 params
['wpa_passphrase'] = '12345678'
612 self
.hapd
=hostapd
.add_ap(self
.iface
, params
)
613 if not self
.hapd
.ping():
614 raise Exception("Could not ping FST hostapd")
615 self
.reg_ctrl
.start()
616 self
.get_global_instance()
619 if len(self
.fst_group
) != 0:
620 self
.send_iface_attach_request(self
.iface
, self
.fst_group
,
621 self
.fst_llt
, self
.fst_pri
)
625 """Removes the AP, To be used when dynamic fst APs are implemented in
627 if len(self
.fst_group
) != 0:
628 self
.remove_all_sessions()
630 self
.send_iface_detach_request(self
.iface
)
634 del self
.global_instance
635 self
.global_instance
= None
637 def get_instance(self
):
638 """Return the Hostapd/WpaSupplicant instance"""
639 if self
.instance
is None:
640 self
.instance
= hostapd
.Hostapd(self
.iface
)
643 def get_global_instance(self
):
644 return self
.global_instance
646 def get_own_mac_address(self
):
647 """Gets the device's own MAC address"""
648 h
= self
.get_instance()
649 status
= h
.get_status()
650 return status
['bssid[0]']
652 def get_actual_peer_addr(self
):
653 """Gets the peer address. A connected station address is returned."""
654 # Use the device instance, the global control interface doesn't have
656 h
= self
.get_instance()
657 sta
= h
.get_sta(None)
658 if sta
is None or 'addr' not in sta
:
659 # Maybe station is not connected?
665 def grequest(self
, req
):
666 """Send request on the global control interface"""
667 logger
.debug("FstAP::grequest: " + req
)
668 h
= self
.get_global_instance()
669 return h
.request(req
)
671 def wait_gevent(self
, events
, timeout
=None):
672 """Wait for a list of events on the global interface"""
673 h
= self
.get_global_instance()
674 if timeout
is not None:
675 return h
.wait_event(events
, timeout
=timeout
)
677 return h
.wait_event(events
)
682 def dump_monitor(self
):
683 """Dump control interface monitor events"""
685 self
.instance
.dump_monitor()
690 class FstSTA (FstDevice
):
691 def __init__(self
, iface
, fst_group
, fst_pri
, fst_llt
=None, rsn
=False):
692 """If fst_group is empty, then FST parameters will not be set
693 If fst_llt is empty, the parameter will not be set and the default value
694 is expected to be configured."""
695 FstDevice
.__init
__(self
, iface
, fst_group
, fst_pri
, fst_llt
, rsn
)
696 self
.connected
= None # FstAP object the station is connected to
699 """Current implementation involves running another instance of
700 wpa_supplicant with fixed FST STAs configurations. When any type of
701 dynamic STA loading is implemented, rewrite the function similarly to
703 h
= self
.get_instance()
704 h
.interface_add(self
.iface
, drv_params
="force_connect_cmd=1")
705 if not h
.global_ping():
706 raise Exception("Could not ping FST wpa_supplicant")
707 if len(self
.fst_group
) != 0:
708 self
.send_iface_attach_request(self
.iface
, self
.fst_group
,
709 self
.fst_llt
, self
.fst_pri
)
713 """Removes the STA. In a static (temporary) implementation does nothing,
714 the STA will be removed when the fst wpa_supplicant process is killed by
716 h
= self
.get_instance()
718 if len(self
.fst_group
) != 0:
719 self
.remove_all_sessions()
720 self
.send_iface_detach_request(self
.iface
)
722 h
.interface_remove(self
.iface
)
727 def get_instance(self
):
728 """Return the Hostapd/WpaSupplicant instance"""
729 if self
.instance
is None:
730 self
.instance
= WpaSupplicant(global_iface
='/tmp/wpas-wlan5')
733 def get_own_mac_address(self
):
734 """Gets the device's own MAC address"""
735 h
= self
.get_instance()
736 status
= h
.get_status()
737 return status
['address']
739 def get_actual_peer_addr(self
):
740 """Gets the peer address. A connected station address is returned"""
741 h
= self
.get_instance()
742 status
= h
.get_status()
743 return status
['bssid']
745 def grequest(self
, req
):
746 """Send request on the global control interface"""
747 logger
.debug("FstSTA::grequest: " + req
)
748 h
= self
.get_instance()
749 return h
.global_request(req
)
751 def wait_gevent(self
, events
, timeout
=None):
752 """Wait for a list of events on the global interface"""
753 h
= self
.get_instance()
754 if timeout
is not None:
755 return h
.wait_global_event(events
, timeout
=timeout
)
757 return h
.wait_global_event(events
)
759 def scan(self
, freq
=None, no_wait
=False, only_new
=False):
760 """Issue Scan with given parameters. Returns the BSS dictionary for the
761 AP found (the 1st BSS found. TODO: What if the AP required is not the
762 1st in list?) or None if no BSS found. None call be also a result of
763 no_wait=True. Note, request("SCAN_RESULTS") can be used to get all the
765 h
= self
.get_instance()
767 h
.scan(None, freq
, no_wait
, only_new
)
772 def connect(self
, ap
, **kwargs
):
773 """Connects to the given AP"""
774 if not isinstance(ap
, FstAP
):
775 raise Exception("Bad AP object to connect to")
776 h
= self
.get_instance()
777 hap
= ap
.get_instance()
779 h
.connect(ap
.get_ssid(), **kwargs
)
783 def connect_to_external_ap(self
, ap
, ssid
, check_connection
=True, **kwargs
):
784 """Connects to the given external AP"""
785 if not isinstance(ap
, hostapd
.Hostapd
):
786 raise Exception("Bad AP object to connect to")
787 h
= self
.get_instance()
789 h
.connect(ssid
, **kwargs
)
792 ev
= ap
.wait_event([ "AP-STA-CONNECTED" ], timeout
=10)
794 self
.connected
= None
795 raise Exception("No connection event received from %s" % ssid
)
798 def disconnect(self
, check_disconnect
=True):
799 """Disconnects from the AP the station is currently connected to"""
800 if self
.connected
is not None:
801 h
= self
.get_instance()
803 h
.request("DISCONNECT")
805 hap
= self
.connected
.get_instance()
806 ev
= hap
.wait_event([ "AP-STA-DISCONNECTED" ], timeout
=10)
808 raise Exception("No disconnection event received from %s" % self
.connected
.get_ssid())
810 self
.connected
= None
813 def disconnect_from_external_ap(self
, check_disconnect
=True):
814 """Disconnects from the external AP the station is currently connected
816 if self
.connected
is not None:
817 h
= self
.get_instance()
819 h
.request("DISCONNECT")
822 ev
= hap
.wait_event([ "AP-STA-DISCONNECTED" ], timeout
=10)
824 raise Exception("No disconnection event received from AP")
826 self
.connected
= None
828 def dump_monitor(self
):
829 """Dump control interface monitor events"""
831 self
.instance
.dump_monitor()