]> git.ipfire.org Git - thirdparty/hostap.git/blame_incremental - tests/hwsim/hostapd.py
tests: remote: Generate and send BSS configuration files
[thirdparty/hostap.git] / tests / hwsim / hostapd.py
... / ...
CommitLineData
1# Python class for controlling hostapd
2# Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import os
8import re
9import time
10import logging
11import binascii
12import struct
13import wpaspy
14import remotehost
15import utils
16import subprocess
17
18logger = logging.getLogger()
19hapd_ctrl = '/var/run/hostapd'
20hapd_global = '/var/run/hostapd-global'
21
22def mac2tuple(mac):
23 return struct.unpack('6B', binascii.unhexlify(mac.replace(':', '')))
24
25class HostapdGlobal:
26 def __init__(self, apdev=None, global_ctrl_override=None):
27 try:
28 hostname = apdev['hostname']
29 port = apdev['port']
30 except:
31 hostname = None
32 port = 8878
33 self.host = remotehost.Host(hostname)
34 self.hostname = hostname
35 self.port = port
36 if hostname is None:
37 global_ctrl = hapd_global
38 if global_ctrl_override:
39 global_ctrl = global_ctrl_override
40 self.ctrl = wpaspy.Ctrl(global_ctrl)
41 self.mon = wpaspy.Ctrl(global_ctrl)
42 self.dbg = ""
43 else:
44 self.ctrl = wpaspy.Ctrl(hostname, port)
45 self.mon = wpaspy.Ctrl(hostname, port)
46 self.dbg = hostname + "/" + str(port)
47 self.mon.attach()
48
49 def cmd_execute(self, cmd_array, shell=False):
50 if self.hostname is None:
51 if shell:
52 cmd = ' '.join(cmd_array)
53 else:
54 cmd = cmd_array
55 proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
56 stdout=subprocess.PIPE, shell=shell)
57 out = proc.communicate()[0]
58 ret = proc.returncode
59 return ret, out.decode()
60 else:
61 return self.host.execute(cmd_array)
62
63 def request(self, cmd, timeout=10):
64 logger.debug(self.dbg + ": CTRL(global): " + cmd)
65 return self.ctrl.request(cmd, timeout)
66
67 def wait_event(self, events, timeout):
68 start = os.times()[4]
69 while True:
70 while self.mon.pending():
71 ev = self.mon.recv()
72 logger.debug(self.dbg + "(global): " + ev)
73 for event in events:
74 if event in ev:
75 return ev
76 now = os.times()[4]
77 remaining = start + timeout - now
78 if remaining <= 0:
79 break
80 if not self.mon.pending(timeout=remaining):
81 break
82 return None
83
84 def add(self, ifname, driver=None):
85 cmd = "ADD " + ifname + " " + hapd_ctrl
86 if driver:
87 cmd += " " + driver
88 res = self.request(cmd)
89 if "OK" not in res:
90 raise Exception("Could not add hostapd interface " + ifname)
91
92 def add_iface(self, ifname, confname):
93 res = self.request("ADD " + ifname + " config=" + confname)
94 if "OK" not in res:
95 raise Exception("Could not add hostapd interface")
96
97 def add_bss(self, phy, confname, ignore_error=False):
98 res = self.request("ADD bss_config=" + phy + ":" + confname)
99 if "OK" not in res:
100 if not ignore_error:
101 raise Exception("Could not add hostapd BSS")
102
103 def remove(self, ifname):
104 self.request("REMOVE " + ifname, timeout=30)
105
106 def relog(self):
107 self.request("RELOG")
108
109 def flush(self):
110 self.request("FLUSH")
111
112 def get_ctrl_iface_port(self, ifname):
113 if self.hostname is None:
114 return None
115
116 res = self.request("INTERFACES ctrl")
117 lines = res.splitlines()
118 found = False
119 for line in lines:
120 words = line.split()
121 if words[0] == ifname:
122 found = True
123 break
124 if not found:
125 raise Exception("Could not find UDP port for " + ifname)
126 res = line.find("ctrl_iface=udp:")
127 if res == -1:
128 raise Exception("Wrong ctrl_interface format")
129 words = line.split(":")
130 return int(words[1])
131
132 def terminate(self):
133 self.mon.detach()
134 self.mon.close()
135 self.mon = None
136 self.ctrl.terminate()
137 self.ctrl = None
138
139 def send_file(self, src, dst):
140 self.host.send_file(src, dst)
141
142class Hostapd:
143 def __init__(self, ifname, bssidx=0, hostname=None, port=8877):
144 self.hostname = hostname
145 self.host = remotehost.Host(hostname, ifname)
146 self.ifname = ifname
147 if hostname is None:
148 self.ctrl = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
149 self.mon = wpaspy.Ctrl(os.path.join(hapd_ctrl, ifname))
150 self.dbg = ifname
151 else:
152 self.ctrl = wpaspy.Ctrl(hostname, port)
153 self.mon = wpaspy.Ctrl(hostname, port)
154 self.dbg = hostname + "/" + ifname
155 self.mon.attach()
156 self.bssid = None
157 self.bssidx = bssidx
158
159 def cmd_execute(self, cmd_array, shell=False):
160 if self.hostname is None:
161 if shell:
162 cmd = ' '.join(cmd_array)
163 else:
164 cmd = cmd_array
165 proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
166 stdout=subprocess.PIPE, shell=shell)
167 out = proc.communicate()[0]
168 ret = proc.returncode
169 return ret, out.decode()
170 else:
171 return self.host.execute(cmd_array)
172
173 def close_ctrl(self):
174 if self.mon is not None:
175 self.mon.detach()
176 self.mon.close()
177 self.mon = None
178 self.ctrl.close()
179 self.ctrl = None
180
181 def own_addr(self):
182 if self.bssid is None:
183 self.bssid = self.get_status_field('bssid[%d]' % self.bssidx)
184 return self.bssid
185
186 def get_addr(self, group=False):
187 return self.own_addr()
188
189 def request(self, cmd):
190 logger.debug(self.dbg + ": CTRL: " + cmd)
191 return self.ctrl.request(cmd)
192
193 def ping(self):
194 return "PONG" in self.request("PING")
195
196 def set(self, field, value):
197 if "OK" not in self.request("SET " + field + " " + value):
198 raise Exception("Failed to set hostapd parameter " + field)
199
200 def set_defaults(self):
201 self.set("driver", "nl80211")
202 self.set("hw_mode", "g")
203 self.set("channel", "1")
204 self.set("ieee80211n", "1")
205 self.set("logger_stdout", "-1")
206 self.set("logger_stdout_level", "0")
207
208 def set_open(self, ssid):
209 self.set_defaults()
210 self.set("ssid", ssid)
211
212 def set_wpa2_psk(self, ssid, passphrase):
213 self.set_defaults()
214 self.set("ssid", ssid)
215 self.set("wpa_passphrase", passphrase)
216 self.set("wpa", "2")
217 self.set("wpa_key_mgmt", "WPA-PSK")
218 self.set("rsn_pairwise", "CCMP")
219
220 def set_wpa_psk(self, ssid, passphrase):
221 self.set_defaults()
222 self.set("ssid", ssid)
223 self.set("wpa_passphrase", passphrase)
224 self.set("wpa", "1")
225 self.set("wpa_key_mgmt", "WPA-PSK")
226 self.set("wpa_pairwise", "TKIP")
227
228 def set_wpa_psk_mixed(self, ssid, passphrase):
229 self.set_defaults()
230 self.set("ssid", ssid)
231 self.set("wpa_passphrase", passphrase)
232 self.set("wpa", "3")
233 self.set("wpa_key_mgmt", "WPA-PSK")
234 self.set("wpa_pairwise", "TKIP")
235 self.set("rsn_pairwise", "CCMP")
236
237 def set_wep(self, ssid, key):
238 self.set_defaults()
239 self.set("ssid", ssid)
240 self.set("wep_key0", key)
241
242 def enable(self):
243 if "OK" not in self.request("ENABLE"):
244 raise Exception("Failed to enable hostapd interface " + self.ifname)
245
246 def disable(self):
247 if "OK" not in self.request("DISABLE"):
248 raise Exception("Failed to disable hostapd interface " + self.ifname)
249
250 def dump_monitor(self):
251 while self.mon.pending():
252 ev = self.mon.recv()
253 logger.debug(self.dbg + ": " + ev)
254
255 def wait_event(self, events, timeout):
256 if not isinstance(events, list):
257 raise Exception("Hostapd.wait_event() called with incorrect events argument type")
258 start = os.times()[4]
259 while True:
260 while self.mon.pending():
261 ev = self.mon.recv()
262 logger.debug(self.dbg + ": " + ev)
263 for event in events:
264 if event in ev:
265 return ev
266 now = os.times()[4]
267 remaining = start + timeout - now
268 if remaining <= 0:
269 break
270 if not self.mon.pending(timeout=remaining):
271 break
272 return None
273
274 def wait_sta(self, addr=None, timeout=2):
275 ev = self.wait_event(["AP-STA-CONNECT"], timeout=timeout)
276 if ev is None:
277 raise Exception("AP did not report STA connection")
278 if addr and addr not in ev:
279 raise Exception("Unexpected STA address in connection event: " + ev)
280
281 def wait_ptkinitdone(self, addr, timeout=2):
282 while timeout > 0:
283 sta = self.get_sta(addr)
284 if 'hostapdWPAPTKState' not in sta:
285 raise Exception("GET_STA did not return hostapdWPAPTKState")
286 state = sta['hostapdWPAPTKState']
287 if state == "11":
288 return
289 time.sleep(0.1)
290 timeout -= 0.1
291 raise Exception("Timeout while waiting for PTKINITDONE")
292
293 def get_status(self):
294 res = self.request("STATUS")
295 lines = res.splitlines()
296 vals = dict()
297 for l in lines:
298 [name, value] = l.split('=', 1)
299 vals[name] = value
300 return vals
301
302 def get_status_field(self, field):
303 vals = self.get_status()
304 if field in vals:
305 return vals[field]
306 return None
307
308 def get_driver_status(self):
309 res = self.request("STATUS-DRIVER")
310 lines = res.splitlines()
311 vals = dict()
312 for l in lines:
313 [name, value] = l.split('=', 1)
314 vals[name] = value
315 return vals
316
317 def get_driver_status_field(self, field):
318 vals = self.get_driver_status()
319 if field in vals:
320 return vals[field]
321 return None
322
323 def get_config(self):
324 res = self.request("GET_CONFIG")
325 lines = res.splitlines()
326 vals = dict()
327 for l in lines:
328 [name, value] = l.split('=', 1)
329 vals[name] = value
330 return vals
331
332 def mgmt_rx(self, timeout=5):
333 ev = self.wait_event(["MGMT-RX"], timeout=timeout)
334 if ev is None:
335 return None
336 msg = {}
337 frame = binascii.unhexlify(ev.split(' ')[1])
338 msg['frame'] = frame
339
340 hdr = struct.unpack('<HH6B6B6BH', frame[0:24])
341 msg['fc'] = hdr[0]
342 msg['subtype'] = (hdr[0] >> 4) & 0xf
343 hdr = hdr[1:]
344 msg['duration'] = hdr[0]
345 hdr = hdr[1:]
346 msg['da'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
347 hdr = hdr[6:]
348 msg['sa'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
349 hdr = hdr[6:]
350 msg['bssid'] = "%02x:%02x:%02x:%02x:%02x:%02x" % hdr[0:6]
351 hdr = hdr[6:]
352 msg['seq_ctrl'] = hdr[0]
353 msg['payload'] = frame[24:]
354
355 return msg
356
357 def mgmt_tx(self, msg):
358 t = (msg['fc'], 0) + mac2tuple(msg['da']) + mac2tuple(msg['sa']) + mac2tuple(msg['bssid']) + (0,)
359 hdr = struct.pack('<HH6B6B6BH', *t)
360 res = self.request("MGMT_TX " + binascii.hexlify(hdr + msg['payload']).decode())
361 if "OK" not in res:
362 raise Exception("MGMT_TX command to hostapd failed")
363
364 def get_sta(self, addr, info=None, next=False):
365 cmd = "STA-NEXT " if next else "STA "
366 if addr is None:
367 res = self.request("STA-FIRST")
368 elif info:
369 res = self.request(cmd + addr + " " + info)
370 else:
371 res = self.request(cmd + addr)
372 lines = res.splitlines()
373 vals = dict()
374 first = True
375 for l in lines:
376 if first and '=' not in l:
377 vals['addr'] = l
378 first = False
379 else:
380 [name, value] = l.split('=', 1)
381 vals[name] = value
382 return vals
383
384 def get_mib(self, param=None):
385 if param:
386 res = self.request("MIB " + param)
387 else:
388 res = self.request("MIB")
389 lines = res.splitlines()
390 vals = dict()
391 for l in lines:
392 name_val = l.split('=', 1)
393 if len(name_val) > 1:
394 vals[name_val[0]] = name_val[1]
395 return vals
396
397 def get_pmksa(self, addr):
398 res = self.request("PMKSA")
399 lines = res.splitlines()
400 for l in lines:
401 if addr not in l:
402 continue
403 vals = dict()
404 [index, aa, pmkid, expiration, opportunistic] = l.split(' ')
405 vals['index'] = index
406 vals['pmkid'] = pmkid
407 vals['expiration'] = expiration
408 vals['opportunistic'] = opportunistic
409 return vals
410 return None
411
412 def dpp_qr_code(self, uri):
413 res = self.request("DPP_QR_CODE " + uri)
414 if "FAIL" in res:
415 raise Exception("Failed to parse QR Code URI")
416 return int(res)
417
418 def dpp_bootstrap_gen(self, type="qrcode", chan=None, mac=None, info=None,
419 curve=None, key=None):
420 cmd = "DPP_BOOTSTRAP_GEN type=" + type
421 if chan:
422 cmd += " chan=" + chan
423 if mac:
424 if mac is True:
425 mac = self.own_addr()
426 cmd += " mac=" + mac.replace(':', '')
427 if info:
428 cmd += " info=" + info
429 if curve:
430 cmd += " curve=" + curve
431 if key:
432 cmd += " key=" + key
433 res = self.request(cmd)
434 if "FAIL" in res:
435 raise Exception("Failed to generate bootstrapping info")
436 return int(res)
437
438 def dpp_listen(self, freq, netrole=None, qr=None, role=None):
439 cmd = "DPP_LISTEN " + str(freq)
440 if netrole:
441 cmd += " netrole=" + netrole
442 if qr:
443 cmd += " qr=" + qr
444 if role:
445 cmd += " role=" + role
446 if "OK" not in self.request(cmd):
447 raise Exception("Failed to start listen operation")
448
449 def dpp_auth_init(self, peer=None, uri=None, conf=None, configurator=None,
450 extra=None, own=None, role=None, neg_freq=None,
451 ssid=None, passphrase=None, expect_fail=False):
452 cmd = "DPP_AUTH_INIT"
453 if peer is None:
454 peer = self.dpp_qr_code(uri)
455 cmd += " peer=%d" % peer
456 if own is not None:
457 cmd += " own=%d" % own
458 if role:
459 cmd += " role=" + role
460 if extra:
461 cmd += " " + extra
462 if conf:
463 cmd += " conf=" + conf
464 if configurator is not None:
465 cmd += " configurator=%d" % configurator
466 if neg_freq:
467 cmd += " neg_freq=%d" % neg_freq
468 if ssid:
469 cmd += " ssid=" + binascii.hexlify(ssid.encode()).decode()
470 if passphrase:
471 cmd += " pass=" + binascii.hexlify(passphrase.encode()).decode()
472 res = self.request(cmd)
473 if expect_fail:
474 if "FAIL" not in res:
475 raise Exception("DPP authentication started unexpectedly")
476 return
477 if "OK" not in res:
478 raise Exception("Failed to initiate DPP Authentication")
479
480 def dpp_pkex_init(self, identifier, code, role=None, key=None, curve=None,
481 extra=None, use_id=None):
482 if use_id is None:
483 id1 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
484 else:
485 id1 = use_id
486 cmd = "own=%d " % id1
487 if identifier:
488 cmd += "identifier=%s " % identifier
489 cmd += "init=1 "
490 if role:
491 cmd += "role=%s " % role
492 if extra:
493 cmd += extra + " "
494 cmd += "code=%s" % code
495 res = self.request("DPP_PKEX_ADD " + cmd)
496 if "FAIL" in res:
497 raise Exception("Failed to set PKEX data (initiator)")
498 return id1
499
500 def dpp_pkex_resp(self, freq, identifier, code, key=None, curve=None,
501 listen_role=None):
502 id0 = self.dpp_bootstrap_gen(type="pkex", key=key, curve=curve)
503 cmd = "own=%d " % id0
504 if identifier:
505 cmd += "identifier=%s " % identifier
506 cmd += "code=%s" % code
507 res = self.request("DPP_PKEX_ADD " + cmd)
508 if "FAIL" in res:
509 raise Exception("Failed to set PKEX data (responder)")
510 self.dpp_listen(freq, role=listen_role)
511
512 def dpp_configurator_add(self, curve=None, key=None):
513 cmd = "DPP_CONFIGURATOR_ADD"
514 if curve:
515 cmd += " curve=" + curve
516 if key:
517 cmd += " key=" + key
518 res = self.request(cmd)
519 if "FAIL" in res:
520 raise Exception("Failed to add configurator")
521 return int(res)
522
523 def dpp_configurator_remove(self, conf_id):
524 res = self.request("DPP_CONFIGURATOR_REMOVE %d" % conf_id)
525 if "OK" not in res:
526 raise Exception("DPP_CONFIGURATOR_REMOVE failed")
527
528 def note(self, txt):
529 self.request("NOTE " + txt)
530
531 def send_file(self, src, dst):
532 self.host.send_file(src, dst)
533
534def add_ap(apdev, params, wait_enabled=True, no_enable=False, timeout=30,
535 global_ctrl_override=None, driver=False):
536 if isinstance(apdev, dict):
537 ifname = apdev['ifname']
538 try:
539 hostname = apdev['hostname']
540 port = apdev['port']
541 logger.info("Starting AP " + hostname + "/" + port + " " + ifname)
542 except:
543 logger.info("Starting AP " + ifname)
544 hostname = None
545 port = 8878
546 else:
547 ifname = apdev
548 logger.info("Starting AP " + ifname + " (old add_ap argument type)")
549 hostname = None
550 port = 8878
551 hapd_global = HostapdGlobal(apdev,
552 global_ctrl_override=global_ctrl_override)
553 hapd_global.remove(ifname)
554 hapd_global.add(ifname, driver=driver)
555 port = hapd_global.get_ctrl_iface_port(ifname)
556 hapd = Hostapd(ifname, hostname=hostname, port=port)
557 if not hapd.ping():
558 raise Exception("Could not ping hostapd")
559 hapd.set_defaults()
560 fields = ["ssid", "wpa_passphrase", "nas_identifier", "wpa_key_mgmt",
561 "wpa", "wpa_deny_ptk0_rekey",
562 "wpa_pairwise", "rsn_pairwise", "auth_server_addr",
563 "acct_server_addr", "osu_server_uri"]
564 for field in fields:
565 if field in params:
566 hapd.set(field, params[field])
567 for f, v in list(params.items()):
568 if f in fields:
569 continue
570 if isinstance(v, list):
571 for val in v:
572 hapd.set(f, val)
573 else:
574 hapd.set(f, v)
575 if no_enable:
576 return hapd
577 hapd.enable()
578 if wait_enabled:
579 ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=timeout)
580 if ev is None:
581 raise Exception("AP startup timed out")
582 if "AP-ENABLED" not in ev:
583 raise Exception("AP startup failed")
584 return hapd
585
586def add_bss(apdev, ifname, confname, ignore_error=False):
587 phy = utils.get_phy(apdev)
588 try:
589 hostname = apdev['hostname']
590 port = apdev['port']
591 logger.info("Starting BSS " + hostname + "/" + port + " phy=" + phy + " ifname=" + ifname)
592 except:
593 logger.info("Starting BSS phy=" + phy + " ifname=" + ifname)
594 hostname = None
595 port = 8878
596 hapd_global = HostapdGlobal(apdev)
597 confname = cfg_file(apdev, confname, ifname)
598 hapd_global.send_file(confname, confname)
599 hapd_global.add_bss(phy, confname, ignore_error)
600 port = hapd_global.get_ctrl_iface_port(ifname)
601 hapd = Hostapd(ifname, hostname=hostname, port=port)
602 if not hapd.ping():
603 raise Exception("Could not ping hostapd")
604 return hapd
605
606def add_iface(apdev, confname):
607 ifname = apdev['ifname']
608 try:
609 hostname = apdev['hostname']
610 port = apdev['port']
611 logger.info("Starting interface " + hostname + "/" + port + " " + ifname)
612 except:
613 logger.info("Starting interface " + ifname)
614 hostname = None
615 port = 8878
616 hapd_global = HostapdGlobal(apdev)
617 confname = cfg_file(apdev, confname, ifname)
618 hapd_global.send_file(confname, confname)
619 hapd_global.add_iface(ifname, confname)
620 port = hapd_global.get_ctrl_iface_port(ifname)
621 hapd = Hostapd(ifname, hostname=hostname, port=port)
622 if not hapd.ping():
623 raise Exception("Could not ping hostapd")
624 return hapd
625
626def remove_bss(apdev, ifname=None):
627 if ifname == None:
628 ifname = apdev['ifname']
629 try:
630 hostname = apdev['hostname']
631 port = apdev['port']
632 logger.info("Removing BSS " + hostname + "/" + port + " " + ifname)
633 except:
634 logger.info("Removing BSS " + ifname)
635 hapd_global = HostapdGlobal(apdev)
636 hapd_global.remove(ifname)
637
638def terminate(apdev):
639 try:
640 hostname = apdev['hostname']
641 port = apdev['port']
642 logger.info("Terminating hostapd " + hostname + "/" + port)
643 except:
644 logger.info("Terminating hostapd")
645 hapd_global = HostapdGlobal(apdev)
646 hapd_global.terminate()
647
648def wpa2_params(ssid=None, passphrase=None, wpa_key_mgmt="WPA-PSK",
649 ieee80211w=None):
650 params = {"wpa": "2",
651 "wpa_key_mgmt": wpa_key_mgmt,
652 "rsn_pairwise": "CCMP"}
653 if ssid:
654 params["ssid"] = ssid
655 if passphrase:
656 params["wpa_passphrase"] = passphrase
657 if ieee80211w is not None:
658 params["ieee80211w"] = ieee80211w
659 return params
660
661def wpa_params(ssid=None, passphrase=None):
662 params = {"wpa": "1",
663 "wpa_key_mgmt": "WPA-PSK",
664 "wpa_pairwise": "TKIP"}
665 if ssid:
666 params["ssid"] = ssid
667 if passphrase:
668 params["wpa_passphrase"] = passphrase
669 return params
670
671def wpa_mixed_params(ssid=None, passphrase=None):
672 params = {"wpa": "3",
673 "wpa_key_mgmt": "WPA-PSK",
674 "wpa_pairwise": "TKIP",
675 "rsn_pairwise": "CCMP"}
676 if ssid:
677 params["ssid"] = ssid
678 if passphrase:
679 params["wpa_passphrase"] = passphrase
680 return params
681
682def radius_params():
683 params = {"auth_server_addr": "127.0.0.1",
684 "auth_server_port": "1812",
685 "auth_server_shared_secret": "radius",
686 "nas_identifier": "nas.w1.fi"}
687 return params
688
689def wpa_eap_params(ssid=None):
690 params = radius_params()
691 params["wpa"] = "1"
692 params["wpa_key_mgmt"] = "WPA-EAP"
693 params["wpa_pairwise"] = "TKIP"
694 params["ieee8021x"] = "1"
695 if ssid:
696 params["ssid"] = ssid
697 return params
698
699def wpa2_eap_params(ssid=None):
700 params = radius_params()
701 params["wpa"] = "2"
702 params["wpa_key_mgmt"] = "WPA-EAP"
703 params["rsn_pairwise"] = "CCMP"
704 params["ieee8021x"] = "1"
705 if ssid:
706 params["ssid"] = ssid
707 return params
708
709def b_only_params(channel="1", ssid=None, country=None):
710 params = {"hw_mode": "b",
711 "channel": channel}
712 if ssid:
713 params["ssid"] = ssid
714 if country:
715 params["country_code"] = country
716 return params
717
718def g_only_params(channel="1", ssid=None, country=None):
719 params = {"hw_mode": "g",
720 "channel": channel}
721 if ssid:
722 params["ssid"] = ssid
723 if country:
724 params["country_code"] = country
725 return params
726
727def a_only_params(channel="36", ssid=None, country=None):
728 params = {"hw_mode": "a",
729 "channel": channel}
730 if ssid:
731 params["ssid"] = ssid
732 if country:
733 params["country_code"] = country
734 return params
735
736def ht20_params(channel="1", ssid=None, country=None):
737 params = {"ieee80211n": "1",
738 "channel": channel,
739 "hw_mode": "g"}
740 if int(channel) > 14:
741 params["hw_mode"] = "a"
742 if ssid:
743 params["ssid"] = ssid
744 if country:
745 params["country_code"] = country
746 return params
747
748def ht40_plus_params(channel="1", ssid=None, country=None):
749 params = ht20_params(channel, ssid, country)
750 params['ht_capab'] = "[HT40+]"
751 return params
752
753def ht40_minus_params(channel="1", ssid=None, country=None):
754 params = ht20_params(channel, ssid, country)
755 params['ht_capab'] = "[HT40-]"
756 return params
757
758def cmd_execute(apdev, cmd, shell=False):
759 hapd_global = HostapdGlobal(apdev)
760 return hapd_global.cmd_execute(cmd, shell=shell)
761
762def send_file(apdev, src, dst):
763 hapd_global = HostapdGlobal(apdev)
764 return hapd_global.send_file(src, dst)
765
766def acl_file(dev, apdev, conf):
767 filename = os.path.join("/tmp", conf)
768
769 if conf == 'hostapd.macaddr':
770 with open(filename, 'w') as f:
771 mac0 = dev[0].get_status_field("address")
772 f.write(mac0 + '\n')
773 f.write("02:00:00:00:00:12\n")
774 f.write("02:00:00:00:00:34\n")
775 f.write("-02:00:00:00:00:12\n")
776 f.write("-02:00:00:00:00:34\n")
777 f.write("01:01:01:01:01:01\n")
778 f.write("03:01:01:01:01:03\n")
779 elif conf == 'hostapd.accept':
780 with open(filename, 'w') as f:
781 mac0 = dev[0].get_status_field("address")
782 mac1 = dev[1].get_status_field("address")
783 f.write(mac0 + " 1\n")
784 f.write(mac1 + " 2\n")
785 elif conf == 'hostapd.accept2':
786 with open(filename, 'w') as f:
787 mac0 = dev[0].get_status_field("address")
788 mac1 = dev[1].get_status_field("address")
789 mac2 = dev[2].get_status_field("address")
790 f.write(mac0 + " 1\n")
791 f.write(mac1 + " 2\n")
792 f.write(mac2 + " 3\n")
793 else:
794 return conf
795
796 return filename
797
798def bssid_inc(apdev, inc=1):
799 parts = apdev['bssid'].split(':')
800 parts[5] = '%02x' % (int(parts[5], 16) + int(inc))
801 bssid = '%s:%s:%s:%s:%s:%s' % (parts[0], parts[1], parts[2],
802 parts[3], parts[4], parts[5])
803 return bssid
804
805def cfg_file(apdev, conf, ifname=None):
806 # put cfg file in /tmp directory
807 fname = os.path.join("/tmp", conf)
808
809 match = re.search(r'^bss-\d+', conf)
810 if match:
811 with open(fname, 'w') as f:
812 idx = ''.join(filter(str.isdigit, conf))
813 if ifname is None:
814 ifname = apdev['ifname']
815 if idx != '1':
816 ifname = ifname + '-' + idx
817
818 f.write("driver=nl80211\n")
819 f.write("ctrl_interface=/var/run/hostapd\n")
820 f.write("hw_mode=g\n")
821 f.write("channel=1\n")
822 f.write("ieee80211n=1\n")
823 f.write("interface=%s\n" % ifname)
824
825 f.write("ssid=bss-%s\n" % idx)
826 if conf == 'bss-2-dup.conf':
827 bssid = apdev['bssid']
828 else:
829 bssid = bssid_inc(apdev, int(idx) - 1)
830 f.write("bssid=%s\n" % bssid)
831 else:
832 return conf
833
834 return fname