]>
git.ipfire.org Git - thirdparty/hostap.git/blob - wpa_supplicant/eapol_test.py
3 # eapol_test controller
4 # Copyright (c) 2015, Jouni Malinen <j@w1.fi>
6 # This software may be distributed under the terms of the BSD license.
7 # See README for more details.
16 logger
= logging
.getLogger()
17 dir = os
.path
.dirname(os
.path
.realpath(sys
.modules
[__name__
].__file
__))
18 sys
.path
.append(os
.path
.join(dir, '..', 'wpaspy'))
20 wpas_ctrl
= '/tmp/eapol_test'
23 def __init__(self
, ifname
):
25 self
.ctrl
= wpaspy
.Ctrl(os
.path
.join(wpas_ctrl
, ifname
))
26 if "PONG" not in self
.ctrl
.request("PING"):
27 raise Exception("Failed to connect to eapol_test (%s)" % ifname
)
28 self
.mon
= wpaspy
.Ctrl(os
.path
.join(wpas_ctrl
, ifname
))
31 def add_network(self
):
32 id = self
.request("ADD_NETWORK")
34 raise Exception("ADD_NETWORK failed")
37 def remove_network(self
, id):
38 id = self
.request("REMOVE_NETWORK " + str(id))
40 raise Exception("REMOVE_NETWORK failed")
43 def set_network(self
, id, field
, value
):
44 res
= self
.request("SET_NETWORK " + str(id) + " " + field
+ " " + value
)
46 raise Exception("SET_NETWORK failed")
49 def set_network_quoted(self
, id, field
, value
):
50 res
= self
.request("SET_NETWORK " + str(id) + " " + field
+ ' "' + value
+ '"')
52 raise Exception("SET_NETWORK failed")
55 def request(self
, cmd
, timeout
=10):
56 return self
.ctrl
.request(cmd
, timeout
=timeout
)
58 def wait_event(self
, events
, timeout
=10):
61 while self
.mon
.pending():
63 logger
.debug(self
.ifname
+ ": " + ev
)
68 remaining
= start
+ timeout
- now
71 if not self
.mon
.pending(timeout
=remaining
):
75 def run(ifname
, count
, no_fast_reauth
, res
):
76 et
= eapol_test(ifname
)
78 et
.request("AP_SCAN 0")
80 et
.request("SET fast_reauth 0")
82 et
.request("SET fast_reauth 1")
84 et
.set_network(id, "key_mgmt", "IEEE8021X")
85 et
.set_network(id, "eapol_flags", "0")
86 et
.set_network(id, "eap", "TLS")
87 et
.set_network_quoted(id, "identity", "user")
88 et
.set_network_quoted(id, "ca_cert", 'ca.pem')
89 et
.set_network_quoted(id, "client_cert", 'client.pem')
90 et
.set_network_quoted(id, "private_key", 'client.key')
91 et
.set_network_quoted(id, "private_key_passwd", 'whatever')
92 et
.set_network(id, "disabled", "0")
95 for i
in range(count
):
96 et
.request("REASSOCIATE")
97 ev
= et
.wait_event(["CTRL-EVENT-CONNECTED", "CTRL-EVENT-EAP-FAILURE"])
98 if ev
is None or "CTRL-EVENT-CONNECTED" not in ev
:
102 et
.remove_network(id)
105 res
.put("FAIL (%d OK)" % i
)
107 res
.put("PASS %d" % (i
+ 1))
110 parser
= argparse
.ArgumentParser(description
='eapol_test controller')
111 parser
.add_argument('--ctrl', help='control interface directory')
112 parser
.add_argument('--num', help='number of processes')
113 parser
.add_argument('--iter', help='number of iterations')
114 parser
.add_argument('--no-fast-reauth', action
='store_true',
115 dest
='no_fast_reauth',
116 help='disable TLS session resumption')
117 args
= parser
.parse_args()
120 iter = int(args
.iter)
123 wpas_ctrl
= args
.ctrl
128 res
[i
] = Queue
.Queue()
129 t
[i
] = threading
.Thread(target
=run
, args
=(str(i
), iter,
130 args
.no_fast_reauth
, res
[i
]))
136 results
= res
[i
].get(False)
139 print("%d: %s" % (i
, results
))
141 if __name__
== "__main__":