]> git.ipfire.org Git - thirdparty/hostap.git/blame_incremental - tests/hwsim/fst_module_aux.py
Fix PMKID addition to RSN element when RSN Capabilities are not present
[thirdparty/hostap.git] / tests / hwsim / fst_module_aux.py
... / ...
CommitLineData
1# FST tests related classes
2# Copyright (c) 2015, Qualcomm Atheros, Inc.
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import logging
8import subprocess
9import os
10import signal
11import time
12import re
13
14import hostapd
15import wpaspy
16import utils
17from wpasupplicant import WpaSupplicant
18
19import fst_test_common
20
21logger = logging.getLogger()
22
23def parse_fst_iface_event(ev):
24 """Parses FST iface event that comes as a string, e.g.
25 "<3>FST-EVENT-IFACE attached ifname=wlan9 group=fstg0"
26 Returns a dictionary with parsed "event_type", "ifname", and "group"; or
27 None if not an FST event or can't be parsed."""
28 event = {}
29 if ev.find("FST-EVENT-IFACE") == -1:
30 return None
31 if ev.find("attached") != -1:
32 event['event_type'] = 'attached'
33 elif ev.find("detached") != -1:
34 event['event_type'] = 'detached'
35 else:
36 return None
37 f = re.search("ifname=(\S+)", ev)
38 if f is not None:
39 event['ifname'] = f.group(1)
40 f = re.search("group=(\S+)", ev)
41 if f is not None:
42 event['group'] = f.group(1)
43 return event
44
45def parse_fst_session_event(ev):
46 """Parses FST session event that comes as a string, e.g.
47 "<3>FST-EVENT-SESSION event_type=EVENT_FST_SESSION_STATE session_id=0 reason=REASON_STT"
48 Returns a dictionary with parsed "type", "id", and "reason"; or None if not
49 a FST event or can't be parsed"""
50 event = {}
51 if ev.find("FST-EVENT-SESSION") == -1:
52 return None
53 event['new_state'] = '' # The field always exists in the dictionary
54 f = re.search("event_type=(\S+)", ev)
55 if f is None:
56 return None
57 event['type'] = f.group(1)
58 f = re.search("session_id=(\d+)", ev)
59 if f is not None:
60 event['id'] = f.group(1)
61 f = re.search("old_state=(\S+)", ev)
62 if f is not None:
63 event['old_state'] = f.group(1)
64 f = re.search("new_state=(\S+)", ev)
65 if f is not None:
66 event['new_state'] = f.group(1)
67 f = re.search("reason=(\S+)", ev)
68 if f is not None:
69 event['reason'] = f.group(1)
70 return event
71
72def start_two_ap_sta_pairs(apdev, rsn=False):
73 """auxiliary function that creates two pairs of APs and STAs"""
74 ap1 = FstAP(apdev[0]['ifname'], 'fst_11a', 'a',
75 fst_test_common.fst_test_def_chan_a,
76 fst_test_common.fst_test_def_group,
77 fst_test_common.fst_test_def_prio_low,
78 fst_test_common.fst_test_def_llt, rsn=rsn)
79 ap1.start()
80 ap2 = FstAP(apdev[1]['ifname'], 'fst_11g', 'g',
81 fst_test_common.fst_test_def_chan_g,
82 fst_test_common.fst_test_def_group,
83 fst_test_common.fst_test_def_prio_high,
84 fst_test_common.fst_test_def_llt, rsn=rsn)
85 ap2.start()
86
87 sta1 = FstSTA('wlan5',
88 fst_test_common.fst_test_def_group,
89 fst_test_common.fst_test_def_prio_low,
90 fst_test_common.fst_test_def_llt, rsn=rsn)
91 sta1.start()
92 sta2 = FstSTA('wlan6',
93 fst_test_common.fst_test_def_group,
94 fst_test_common.fst_test_def_prio_high,
95 fst_test_common.fst_test_def_llt, rsn=rsn)
96 sta2.start()
97
98 return ap1, ap2, sta1, sta2
99
100def stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2):
101 sta1.stop()
102 sta2.stop()
103 ap1.stop()
104 ap2.stop()
105
106def connect_two_ap_sta_pairs(ap1, ap2, dev1, dev2, rsn=False):
107 """Connects a pair of stations, each one to a separate AP"""
108 dev1.scan(freq=fst_test_common.fst_test_def_freq_a)
109 dev2.scan(freq=fst_test_common.fst_test_def_freq_g)
110
111 if rsn:
112 dev1.connect(ap1, psk="12345678",
113 scan_freq=fst_test_common.fst_test_def_freq_a)
114 dev2.connect(ap2, psk="12345678",
115 scan_freq=fst_test_common.fst_test_def_freq_g)
116 else:
117 dev1.connect(ap1, key_mgmt="NONE",
118 scan_freq=fst_test_common.fst_test_def_freq_a)
119 dev2.connect(ap2, key_mgmt="NONE",
120 scan_freq=fst_test_common.fst_test_def_freq_g)
121
122def disconnect_two_ap_sta_pairs(ap1, ap2, dev1, dev2):
123 dev1.disconnect()
124 dev2.disconnect()
125
126def external_sta_connect(sta, ap, **kwargs):
127 """Connects the external station to the given AP"""
128 if not isinstance(sta, WpaSupplicant):
129 raise Exception("Bad STA object")
130 if not isinstance(ap, FstAP):
131 raise Exception("Bad AP object to connect to")
132 hap = ap.get_instance()
133 sta.connect(ap.get_ssid(), **kwargs)
134
135def disconnect_external_sta(sta, ap, check_disconnect=True):
136 """Disconnects the external station from the AP"""
137 if not isinstance(sta, WpaSupplicant):
138 raise Exception("Bad STA object")
139 if not isinstance(ap, FstAP):
140 raise Exception("Bad AP object to connect to")
141 sta.request("DISCONNECT")
142 if check_disconnect:
143 hap = ap.get_instance()
144 ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10)
145 if ev is None:
146 raise Exception("No disconnection event received from %s" % ap.get_ssid())
147
148#
149# FstDevice class
150# This is the parent class for the AP (FstAP) and STA (FstSTA) that implements
151# FST functionality.
152#
153class FstDevice:
154 def __init__(self, iface, fst_group, fst_pri, fst_llt=None, rsn=False):
155 self.iface = iface
156 self.fst_group = fst_group
157 self.fst_pri = fst_pri
158 self.fst_llt = fst_llt # None llt means no llt parameter will be set
159 self.instance = None # Hostapd/WpaSupplicant instance
160 self.peer_obj = None # Peer object, must be a FstDevice child object
161 self.new_peer_addr = None # Peer MAC address for new session iface
162 self.old_peer_addr = None # Peer MAC address for old session iface
163 self.role = 'initiator' # Role: initiator/responder
164 s = self.grequest("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
165 if not s.startswith('OK'):
166 raise utils.HwsimSkip("FST not supported")
167 self.rsn = rsn
168
169 def ifname(self):
170 return self.iface
171
172 def get_instance(self):
173 """Gets the Hostapd/WpaSupplicant instance"""
174 raise Exception("Virtual get_instance() called!")
175
176 def get_own_mac_address(self):
177 """Gets the device's own MAC address"""
178 raise Exception("Virtual get_own_mac_address() called!")
179
180 def get_new_peer_addr(self):
181 return self.new_peer_addr
182
183 def get_old_peer_addr(self):
184 return self.old_peer_addr
185
186 def get_actual_peer_addr(self):
187 """Gets the peer address. A connected AP/station address is returned."""
188 raise Exception("Virtual get_actual_peer_addr() called!")
189
190 def grequest(self, req):
191 """Send request on the global control interface"""
192 raise Exception, "Virtual grequest() called!"
193
194 def wait_gevent(self, events, timeout=None):
195 """Wait for a list of events on the global interface"""
196 raise Exception("Virtual wait_gevent() called!")
197
198 def request(self, req):
199 """Issue a request to the control interface"""
200 h = self.get_instance()
201 return h.request(req)
202
203 def wait_event(self, events, timeout=None):
204 """Wait for an event from the control interface"""
205 h = self.get_instance()
206 if timeout is not None:
207 return h.wait_event(events, timeout=timeout)
208 else:
209 return h.wait_event(events)
210
211 def set_old_peer_addr(self, peer_addr=None):
212 """Sets the peer address"""
213 if peer_addr is not None:
214 self.old_peer_addr = peer_addr
215 else:
216 self.old_peer_addr = self.get_actual_peer_addr()
217
218 def set_new_peer_addr(self, peer_addr=None):
219 """Sets the peer address"""
220 if peer_addr is not None:
221 self.new_peer_addr = peer_addr
222 else:
223 self.new_peer_addr = self.get_actual_peer_addr()
224
225 def add_peer(self, obj, old_peer_addr=None, new_peer_addr=None):
226 """Add peer for FST session(s). 'obj' is a FstDevice subclass object.
227 The method must be called before add_session().
228 If peer_addr is not specified, the address of the currently connected
229 station is used."""
230 if not isinstance(obj, FstDevice):
231 raise Exception("Peer must be a FstDevice object")
232 self.peer_obj = obj
233 self.set_old_peer_addr(old_peer_addr)
234 self.set_new_peer_addr(new_peer_addr)
235
236 def get_peer(self):
237 """Returns peer object"""
238 return self.peer_obj
239
240 def set_fst_parameters(self, group_id=None, pri=None, llt=None):
241 """Change/set new FST parameters. Can be used to start FST sessions with
242 different FST parameters than defined in the configuration file."""
243 if group_id is not None:
244 self.fst_group = group_id
245 if pri is not None:
246 self.fst_pri = pri
247 if llt is not None:
248 self.fst_llt = llt
249
250 def get_local_mbies(self, ifname=None):
251 if_name = ifname if ifname is not None else self.iface
252 return self.grequest("FST-MANAGER TEST_REQUEST GET_LOCAL_MBIES " + if_name)
253
254 def add_session(self):
255 """Adds an FST session. add_peer() must be called calling this
256 function"""
257 if self.peer_obj is None:
258 raise Exception("Peer wasn't added before starting session")
259 grp = ' ' + self.fst_group if self.fst_group != '' else ''
260 sid = self.grequest("FST-MANAGER SESSION_ADD" + grp)
261 sid = sid.strip()
262 if sid.startswith("FAIL"):
263 raise Exception("Cannot add FST session with groupid ==" + grp)
264 return sid
265
266 def set_session_param(self, params):
267 request = "FST-MANAGER SESSION_SET"
268 if params is not None and params != '':
269 request = request + ' ' + params
270 return self.grequest(request)
271
272 def get_session_params(self, sid):
273 request = "FST-MANAGER SESSION_GET " + sid
274 res = self.grequest(request)
275 if res.startswith("FAIL"):
276 return None
277 params = {}
278 for i in res.splitlines():
279 p = i.split('=')
280 params[p[0]] = p[1]
281 return params
282
283 def iface_peers(self, ifname):
284 grp = self.fst_group if self.fst_group != '' else ''
285 res = self.grequest("FST-MANAGER IFACE_PEERS " + grp + ' ' + ifname)
286 if res.startswith("FAIL"):
287 return None
288 return res.splitlines()
289
290 def get_peer_mbies(self, ifname, peer_addr):
291 return self.grequest("FST-MANAGER GET_PEER_MBIES %s %s" % (ifname, peer_addr))
292
293 def list_ifaces(self):
294 grp = self.fst_group if self.fst_group != '' else ''
295 res = self.grequest("FST-MANAGER LIST_IFACES " + grp)
296 if res.startswith("FAIL"):
297 return None
298 ifaces = []
299 for i in res.splitlines():
300 p = i.split(':')
301 iface = {}
302 iface['name'] = p[0]
303 iface['priority'] = p[1]
304 iface['llt'] = p[2]
305 ifaces.append(iface)
306 return ifaces
307
308 def list_groups(self):
309 res = self.grequest("FST-MANAGER LIST_GROUPS")
310 if res.startswith("FAIL"):
311 return None
312 return res.splitlines()
313
314 def configure_session(self, sid, new_iface, old_iface = None):
315 """Calls session_set for a number of parameters some of which are stored
316 in "self" while others are passed to this function explicitly. If
317 old_iface is None, current iface is used; if old_iface is an empty
318 string."""
319 oldiface = old_iface if old_iface is not None else self.iface
320 s = self.set_session_param(sid + ' old_ifname=' + oldiface)
321 if not s.startswith("OK"):
322 raise Exception("Cannot set FST session old_ifname: " + s)
323 if new_iface is not None:
324 s = self.set_session_param(sid + " new_ifname=" + new_iface)
325 if not s.startswith("OK"):
326 raise Exception("Cannot set FST session new_ifname:" + s)
327 if self.new_peer_addr is not None and self.new_peer_addr != '':
328 s = self.set_session_param(sid + " new_peer_addr=" + self.new_peer_addr)
329 if not s.startswith("OK"):
330 raise Exception("Cannot set FST session peer address:" + s + " (new)")
331 if self.old_peer_addr is not None and self.old_peer_addr != '':
332 s = self.set_session_param(sid + " old_peer_addr=" + self.old_peer_addr)
333 if not s.startswith("OK"):
334 raise Exception("Cannot set FST session peer address:" + s + " (old)")
335 if self.fst_llt is not None and self.fst_llt != '':
336 s = self.set_session_param(sid + " llt=" + self.fst_llt)
337 if not s.startswith("OK"):
338 raise Exception("Cannot set FST session llt:" + s)
339
340 def send_iface_attach_request(self, ifname, group, llt, priority):
341 request = "FST-ATTACH " + ifname + ' ' + group
342 if llt is not None:
343 request += " llt=" + llt
344 if priority is not None:
345 request += " priority=" + priority
346 res = self.grequest(request)
347 if not res.startswith("OK"):
348 raise Exception("Cannot attach FST iface: " + res)
349
350 def send_iface_detach_request(self, ifname):
351 res = self.grequest("FST-DETACH " + ifname)
352 if not res.startswith("OK"):
353 raise Exception("Cannot detach FST iface: " + res)
354
355 def send_session_setup_request(self, sid):
356 s = self.grequest("FST-MANAGER SESSION_INITIATE " + sid)
357 if not s.startswith('OK'):
358 raise Exception("Cannot send setup request: %s" % s)
359 return s
360
361 def send_session_setup_response(self, sid, response):
362 request = "FST-MANAGER SESSION_RESPOND " + sid + " " + response
363 s = self.grequest(request)
364 if not s.startswith('OK'):
365 raise Exception("Cannot send setup response: %s" % s)
366 return s
367
368 def send_test_session_setup_request(self, fsts_id,
369 additional_parameter = None):
370 request = "FST-MANAGER TEST_REQUEST SEND_SETUP_REQUEST " + fsts_id
371 if additional_parameter is not None:
372 request += " " + additional_parameter
373 s = self.grequest(request)
374 if not s.startswith('OK'):
375 raise Exception("Cannot send FST setup request: %s" % s)
376 return s
377
378 def send_test_session_setup_response(self, fsts_id,
379 response, additional_parameter = None):
380 request = "FST-MANAGER TEST_REQUEST SEND_SETUP_RESPONSE " + fsts_id + " " + response
381 if additional_parameter is not None:
382 request += " " + additional_parameter
383 s = self.grequest(request)
384 if not s.startswith('OK'):
385 raise Exception("Cannot send FST setup response: %s" % s)
386 return s
387
388 def send_test_ack_request(self, fsts_id):
389 s = self.grequest("FST-MANAGER TEST_REQUEST SEND_ACK_REQUEST " + fsts_id)
390 if not s.startswith('OK'):
391 raise Exception("Cannot send FST ack request: %s" % s)
392 return s
393
394 def send_test_ack_response(self, fsts_id):
395 s = self.grequest("FST-MANAGER TEST_REQUEST SEND_ACK_RESPONSE " + fsts_id)
396 if not s.startswith('OK'):
397 raise Exception("Cannot send FST ack response: %s" % s)
398 return s
399
400 def send_test_tear_down(self, fsts_id):
401 s = self.grequest("FST-MANAGER TEST_REQUEST SEND_TEAR_DOWN " + fsts_id)
402 if not s.startswith('OK'):
403 raise Exception("Cannot send FST tear down: %s" % s)
404 return s
405
406 def get_fsts_id_by_sid(self, sid):
407 s = self.grequest("FST-MANAGER TEST_REQUEST GET_FSTS_ID " + sid)
408 if s == ' ' or s.startswith('FAIL'):
409 raise Exception("Cannot get fsts_id for sid == %s" % sid)
410 return int(s)
411
412 def wait_for_iface_event(self, timeout):
413 while True:
414 ev = self.wait_gevent(["FST-EVENT-IFACE"], timeout)
415 if ev is None:
416 raise Exception("No FST-EVENT-IFACE received")
417 event = parse_fst_iface_event(ev)
418 if event is None:
419 # We can't parse so it's not our event, wait for next one
420 continue
421 return event
422
423 def wait_for_session_event(self, timeout, events_to_ignore=[],
424 events_to_count=[]):
425 while True:
426 ev = self.wait_gevent(["FST-EVENT-SESSION"], timeout)
427 if ev is None:
428 raise Exception("No FST-EVENT-SESSION received")
429 event = parse_fst_session_event(ev)
430 if event is None:
431 # We can't parse so it's not our event, wait for next one
432 continue
433 if len(events_to_ignore) > 0:
434 if event['type'] in events_to_ignore:
435 continue
436 elif len(events_to_count) > 0:
437 if not event['type'] in events_to_count:
438 continue
439 return event
440
441 def initiate_session(self, sid, response="accept"):
442 """Initiates FST session with given session id 'sid'.
443 'response' is the session respond answer: "accept", "reject", or a
444 special "timeout" value to skip the response in order to test session
445 timeouts.
446 Returns: "OK" - session has been initiated, otherwise the reason for the
447 reset: REASON_REJECT, REASON_STT."""
448 strsid = ' ' + sid if sid != '' else ''
449 s = self.grequest("FST-MANAGER SESSION_INITIATE"+ strsid)
450 if not s.startswith('OK'):
451 raise Exception("Cannot initiate fst session: %s" % s)
452 ev = self.peer_obj.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5)
453 if ev is None:
454 raise Exception("No FST-EVENT-SESSION received")
455 # We got FST event
456 event = parse_fst_session_event(ev)
457 if event == None:
458 raise Exception("Unrecognized FST event: " % ev)
459 if event['type'] != 'EVENT_FST_SETUP':
460 raise Exception("Expected FST_SETUP event, got: " + event['type'])
461 ev = self.peer_obj.wait_gevent(["FST-EVENT-SESSION"], timeout=5)
462 if ev is None:
463 raise Exception("No FST-EVENT-SESSION received")
464 event = parse_fst_session_event(ev)
465 if event == None:
466 raise Exception("Unrecognized FST event: " % ev)
467 if event['type'] != 'EVENT_FST_SESSION_STATE':
468 raise Exception("Expected EVENT_FST_SESSION_STATE event, got: " + event['type'])
469 if event['new_state'] != "SETUP_COMPLETION":
470 raise Exception("Expected new state SETUP_COMPLETION, got: " + event['new_state'])
471 if response == '':
472 return 'OK'
473 if response != "timeout":
474 s = self.peer_obj.grequest("FST-MANAGER SESSION_RESPOND "+ event['id'] + " " + response) # Or reject
475 if not s.startswith('OK'):
476 raise Exception("Error session_respond: %s" % s)
477 # Wait for EVENT_FST_SESSION_STATE events. We should get at least 2
478 # events. The 1st event will be EVENT_FST_SESSION_STATE
479 # old_state=INITIAL new_state=SETUP_COMPLETED. The 2nd event will be
480 # either EVENT_FST_ESTABLISHED with the session id or
481 # EVENT_FST_SESSION_STATE with new_state=INITIAL if the session was
482 # reset, the reason field will tell why.
483 result = ''
484 while result == '':
485 ev = self.wait_gevent(["FST-EVENT-SESSION"], timeout=5)
486 if ev is None:
487 break # No session event received
488 event = parse_fst_session_event(ev)
489 if event == None:
490 # We can't parse so it's not our event, wait for next one
491 continue
492 if event['type'] == 'EVENT_FST_ESTABLISHED':
493 result = "OK"
494 break
495 elif event['type'] == "EVENT_FST_SESSION_STATE":
496 if event['new_state'] == "INITIAL":
497 # Session was reset, the only reason to get back to initial
498 # state.
499 result = event['reason']
500 break
501 if result == '':
502 raise Exception("No event for session respond")
503 return result
504
505 def transfer_session(self, sid):
506 """Transfers the session. 'sid' is the session id. 'hsta' is the
507 station-responder object.
508 Returns: REASON_SWITCH - the session has been transferred successfully
509 or a REASON_... reported by the reset event."""
510 request = "FST-MANAGER SESSION_TRANSFER"
511 if sid != '':
512 request += ' ' + sid
513 s = self.grequest(request)
514 if not s.startswith('OK'):
515 raise Exception("Cannot transfer fst session: %s" % s)
516 result = ''
517 while result == '':
518 ev = self.peer_obj.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5)
519 if ev is None:
520 raise Exception("Missing session transfer event")
521 # We got FST event. We expect TRANSITION_CONFIRMED state and then
522 # INITIAL (reset) with the reason (e.g. "REASON_SWITCH").
523 # Right now we'll be waiting for the reset event and record the
524 # reason.
525 event = parse_fst_session_event(ev)
526 if event == None:
527 raise Exception("Unrecognized FST event: " % ev)
528 if event['new_state'] == 'INITIAL':
529 result = event['reason']
530 return result
531
532 def wait_for_tear_down(self):
533 ev = self.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5)
534 if ev is None:
535 raise Exception("No FST-EVENT-SESSION received")
536 # We got FST event
537 event = parse_fst_session_event(ev)
538 if event == None:
539 raise Exception("Unrecognized FST event: " % ev)
540 if event['type'] != 'EVENT_FST_SESSION_STATE':
541 raise Exception("Expected EVENT_FST_SESSION_STATE event, got: " + event['type'])
542 if event['new_state'] != "INITIAL":
543 raise Exception("Expected new state INITIAL, got: " + event['new_state'])
544 if event['reason'] != 'REASON_TEARDOWN':
545 raise Exception("Expected reason REASON_TEARDOWN, got: " + event['reason'])
546
547 def teardown_session(self, sid):
548 """Tears down FST session with a given session id ('sid')"""
549 strsid = ' ' + sid if sid != '' else ''
550 s = self.grequest("FST-MANAGER SESSION_TEARDOWN" + strsid)
551 if not s.startswith('OK'):
552 raise Exception("Cannot tear down fst session: %s" % s)
553 self.peer_obj.wait_for_tear_down()
554
555
556 def remove_session(self, sid, wait_for_tear_down=True):
557 """Removes FST session with a given session id ('sid')"""
558 strsid = ' ' + sid if sid != '' else ''
559 s = self.grequest("FST-MANAGER SESSION_REMOVE" + strsid)
560 if not s.startswith('OK'):
561 raise Exception("Cannot remove fst session: %s" % s)
562 if wait_for_tear_down == True:
563 self.peer_obj.wait_for_tear_down()
564
565 def remove_all_sessions(self):
566 """Removes FST session with a given session id ('sid')"""
567 grp = ' ' + self.fst_group if self.fst_group != '' else ''
568 s = self.grequest("FST-MANAGER LIST_SESSIONS" + grp)
569 if not s.startswith('FAIL'):
570 for sid in s.splitlines():
571 sid = sid.strip()
572 if len(sid) != 0:
573 self.remove_session(sid, wait_for_tear_down=False)
574
575
576#
577# FstAP class
578#
579class FstAP (FstDevice):
580 def __init__(self, iface, ssid, mode, chan, fst_group, fst_pri,
581 fst_llt=None, rsn=False):
582 """If fst_group is empty, then FST parameters will not be set
583 If fst_llt is empty, the parameter will not be set and the default value
584 is expected to be configured."""
585 self.ssid = ssid
586 self.mode = mode
587 self.chan = chan
588 self.reg_ctrl = fst_test_common.HapdRegCtrl()
589 self.reg_ctrl.add_ap(iface, self.chan)
590 self.global_instance = hostapd.HostapdGlobal()
591 FstDevice.__init__(self, iface, fst_group, fst_pri, fst_llt, rsn)
592
593 def start(self, return_early=False):
594 """Starts AP the "standard" way as it was intended by hostapd tests.
595 This will work only when FST supports fully dynamically loading
596 parameters in hostapd."""
597 params = {}
598 params['ssid'] = self.ssid
599 params['hw_mode'] = self.mode
600 params['channel'] = self.chan
601 params['country_code'] = 'US'
602 if self.rsn:
603 params['wpa'] = '2'
604 params['wpa_key_mgmt'] = 'WPA-PSK'
605 params['rsn_pairwise'] = 'CCMP'
606 params['wpa_passphrase'] = '12345678'
607 self.hapd=hostapd.add_ap(self.iface, params)
608 if not self.hapd.ping():
609 raise Exception("Could not ping FST hostapd")
610 self.reg_ctrl.start()
611 self.get_global_instance()
612 if return_early:
613 return self.hapd
614 if len(self.fst_group) != 0:
615 self.send_iface_attach_request(self.iface, self.fst_group,
616 self.fst_llt, self.fst_pri)
617 return self.hapd
618
619 def stop(self):
620 """Removes the AP, To be used when dynamic fst APs are implemented in
621 hostapd."""
622 if len(self.fst_group) != 0:
623 self.remove_all_sessions()
624 self.send_iface_detach_request(self.iface)
625 self.reg_ctrl.stop()
626 del self.global_instance
627 self.global_instance = None
628
629 def get_instance(self):
630 """Return the Hostapd/WpaSupplicant instance"""
631 if self.instance is None:
632 self.instance = hostapd.Hostapd(self.iface)
633 return self.instance
634
635 def get_global_instance(self):
636 return self.global_instance
637
638 def get_own_mac_address(self):
639 """Gets the device's own MAC address"""
640 h = self.get_instance()
641 status = h.get_status()
642 return status['bssid[0]']
643
644 def get_actual_peer_addr(self):
645 """Gets the peer address. A connected station address is returned."""
646 # Use the device instance, the global control interface doesn't have
647 # station address
648 h = self.get_instance()
649 sta = h.get_sta(None)
650 if sta is None or 'addr' not in sta:
651 # Maybe station is not connected?
652 addr = None
653 else:
654 addr=sta['addr']
655 return addr
656
657 def grequest(self, req):
658 """Send request on the global control interface"""
659 logger.debug("FstAP::grequest: " + req)
660 h = self.get_global_instance()
661 return h.request(req)
662
663 def wait_gevent(self, events, timeout=None):
664 """Wait for a list of events on the global interface"""
665 h = self.get_global_instance()
666 if timeout is not None:
667 return h.wait_event(events, timeout=timeout)
668 else:
669 return h.wait_event(events)
670
671 def get_ssid(self):
672 return self.ssid
673
674#
675# FstSTA class
676#
677class FstSTA (FstDevice):
678 def __init__(self, iface, fst_group, fst_pri, fst_llt=None, rsn=False):
679 """If fst_group is empty, then FST parameters will not be set
680 If fst_llt is empty, the parameter will not be set and the default value
681 is expected to be configured."""
682 FstDevice.__init__(self, iface, fst_group, fst_pri, fst_llt, rsn)
683 self.connected = None # FstAP object the station is connected to
684
685 def start(self):
686 """Current implementation involves running another instance of
687 wpa_supplicant with fixed FST STAs configurations. When any type of
688 dynamic STA loading is implemented, rewrite the function similarly to
689 FstAP."""
690 h = self.get_instance()
691 h.interface_add(self.iface, drv_params="force_connect_cmd=1")
692 if not h.global_ping():
693 raise Exception("Could not ping FST wpa_supplicant")
694 if len(self.fst_group) != 0:
695 self.send_iface_attach_request(self.iface, self.fst_group,
696 self.fst_llt, self.fst_pri)
697 return None
698
699 def stop(self):
700 """Removes the STA. In a static (temporary) implementation does nothing,
701 the STA will be removed when the fst wpa_supplicant process is killed by
702 fstap.cleanup()."""
703 h = self.get_instance()
704 if len(self.fst_group) != 0:
705 self.remove_all_sessions()
706 self.send_iface_detach_request(self.iface)
707 h.interface_remove(self.iface)
708 h.close_ctrl()
709 del h
710 self.instance = None
711
712 def get_instance(self):
713 """Return the Hostapd/WpaSupplicant instance"""
714 if self.instance is None:
715 self.instance = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
716 return self.instance
717
718 def get_own_mac_address(self):
719 """Gets the device's own MAC address"""
720 h = self.get_instance()
721 status = h.get_status()
722 return status['address']
723
724 def get_actual_peer_addr(self):
725 """Gets the peer address. A connected station address is returned"""
726 h = self.get_instance()
727 status = h.get_status()
728 return status['bssid']
729
730 def grequest(self, req):
731 """Send request on the global control interface"""
732 logger.debug("FstSTA::grequest: " + req)
733 h = self.get_instance()
734 return h.global_request(req)
735
736 def wait_gevent(self, events, timeout=None):
737 """Wait for a list of events on the global interface"""
738 h = self.get_instance()
739 if timeout is not None:
740 return h.wait_global_event(events, timeout=timeout)
741 else:
742 return h.wait_global_event(events)
743
744 def scan(self, freq=None, no_wait=False, only_new=False):
745 """Issue Scan with given parameters. Returns the BSS dictionary for the
746 AP found (the 1st BSS found. TODO: What if the AP required is not the
747 1st in list?) or None if no BSS found. None call be also a result of
748 no_wait=True. Note, request("SCAN_RESULTS") can be used to get all the
749 results at once."""
750 h = self.get_instance()
751 h.scan(None, freq, no_wait, only_new)
752 r = h.get_bss('0')
753 return r
754
755 def connect(self, ap, **kwargs):
756 """Connects to the given AP"""
757 if not isinstance(ap, FstAP):
758 raise Exception("Bad AP object to connect to")
759 h = self.get_instance()
760 hap = ap.get_instance()
761 h.connect(ap.get_ssid(), **kwargs)
762 self.connected = ap
763
764 def connect_to_external_ap(self, ap, ssid, check_connection=True, **kwargs):
765 """Connects to the given external AP"""
766 if not isinstance(ap, hostapd.Hostapd):
767 raise Exception("Bad AP object to connect to")
768 h = self.get_instance()
769 h.connect(ssid, **kwargs)
770 self.connected = ap
771 if check_connection:
772 ev = ap.wait_event([ "AP-STA-CONNECTED" ], timeout=10)
773 if ev is None:
774 self.connected = None
775 raise Exception("No connection event received from %s" % ssid)
776
777 def disconnect(self, check_disconnect=True):
778 """Disconnects from the AP the station is currently connected to"""
779 if self.connected is not None:
780 h = self.get_instance()
781 h.request("DISCONNECT")
782 if check_disconnect:
783 hap = self.connected.get_instance()
784 ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10)
785 if ev is None:
786 raise Exception("No disconnection event received from %s" % self.connected.get_ssid())
787 self.connected = None
788
789
790 def disconnect_from_external_ap(self, check_disconnect=True):
791 """Disconnects from the external AP the station is currently connected
792 to"""
793 if self.connected is not None:
794 h = self.get_instance()
795 h.request("DISCONNECT")
796 if check_disconnect:
797 hap = self.connected
798 ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10)
799 if ev is None:
800 raise Exception("No disconnection event received from AP")
801 self.connected = None