]> git.ipfire.org Git - thirdparty/hostap.git/blob - wpa_supplicant/examples/p2p/p2p_connect.py
tests: Use python3 compatible "except" statement
[thirdparty/hostap.git] / wpa_supplicant / examples / p2p / p2p_connect.py
1 #!/usr/bin/python
2 # Tests p2p_connect
3 # Will try to connect to another peer
4 # and form a group
5 ######### MAY NEED TO RUN AS SUDO #############
6
7 import dbus
8 import sys, os
9 import time
10 import gobject
11 import getopt
12 from dbus.mainloop.glib import DBusGMainLoop
13
14
15 def usage():
16 print "Usage:"
17 print " %s -i <interface_name> -m <wps_method> \ " \
18 % sys.argv[0]
19 print " -a <addr> [-p <pin>] [-g <go_intent>] \ "
20 print " [-w <wpas_dbus_interface>]"
21 print "Options:"
22 print " -i = interface name"
23 print " -m = wps method"
24 print " -a = peer address"
25 print " -p = pin number (8 digits)"
26 print " -g = group owner intent"
27 print " -w = wpas dbus interface = fi.w1.wpa_supplicant1"
28 print "Example:"
29 print " %s -i wlan0 -a 0015008352c0 -m display -p 12345670" % sys.argv[0]
30
31
32 # Required Signals
33 def GONegotiationSuccess(status):
34 print "Go Negotiation Success"
35
36 def GONegotiationFailure(status):
37 print 'Go Negotiation Failed. Status:'
38 print format(status)
39 os._exit(0)
40
41 def GroupStarted(properties):
42 if properties.has_key("group_object"):
43 print 'Group Formation Complete %s' \
44 % properties["group_object"]
45 os._exit(0)
46
47 def WpsFailure(status, etc):
48 print "WPS Authentication Failure".format(status)
49 print etc
50 os._exit(0)
51
52 class P2P_Connect():
53 # Needed Variables
54 global bus
55 global wpas_object
56 global interface_object
57 global p2p_interface
58 global ifname
59 global wpas
60 global wpas_dbus_interface
61 global timeout
62 global path
63 global wps_method
64 global go_intent
65 global addr
66 global pin
67
68 # Dbus Paths
69 global wpas_dbus_opath
70 global wpas_dbus_interfaces_opath
71 global wpas_dbus_interfaces_interface
72 global wpas_dbus_interfaces_p2pdevice
73
74 # Dictionary of Arguements
75 global p2p_connect_arguements
76
77 # Constructor
78 def __init__(self,ifname,wpas_dbus_interface,addr,
79 pin,wps_method,go_intent):
80 # Initializes variables and threads
81 self.ifname = ifname
82 self.wpas_dbus_interface = wpas_dbus_interface
83 self.wps_method = wps_method
84 self.go_intent = go_intent
85 self.addr = addr
86 self.pin = pin
87
88 # Generating interface/object paths
89 self.wpas_dbus_opath = \
90 "/" + self.wpas_dbus_interface.replace(".","/")
91 self.wpas_wpas_dbus_interfaces_opath = \
92 self.wpas_dbus_opath + "/Interfaces"
93 self.wpas_dbus_interfaces_interface = \
94 self.wpas_dbus_interface + ".Interface"
95 self.wpas_dbus_interfaces_p2pdevice = \
96 self.wpas_dbus_interfaces_interface + ".P2PDevice"
97
98 # Getting interfaces and objects
99 DBusGMainLoop(set_as_default=True)
100 self.bus = dbus.SystemBus()
101 self.wpas_object = self.bus.get_object(
102 self.wpas_dbus_interface,
103 self.wpas_dbus_opath)
104 self.wpas = dbus.Interface(
105 self.wpas_object, self.wpas_dbus_interface)
106
107 # See if wpa_supplicant already knows about this interface
108 self.path = None
109 try:
110 self.path = self.wpas.GetInterface(ifname)
111 except:
112 if not str(exc).startswith(
113 self.wpas_dbus_interface + \
114 ".InterfaceUnknown:"):
115 raise exc
116 try:
117 path = self.wpas.CreateInterface(
118 {'Ifname': ifname, 'Driver': 'test'})
119 time.sleep(1)
120
121 except dbus.DBusException as exc:
122 if not str(exc).startswith(
123 self.wpas_dbus_interface + \
124 ".InterfaceExists:"):
125 raise exc
126
127 # Get Interface and objects
128 self.interface_object = self.bus.get_object(
129 self.wpas_dbus_interface,self.path)
130 self.p2p_interface = dbus.Interface(
131 self.interface_object,
132 self.wpas_dbus_interfaces_p2pdevice)
133
134 # Add signals
135 self.bus.add_signal_receiver(GONegotiationSuccess,
136 dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
137 signal_name="GONegotiationSuccess")
138 self.bus.add_signal_receiver(GONegotiationFailure,
139 dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
140 signal_name="GONegotiationFailure")
141 self.bus.add_signal_receiver(GroupStarted,
142 dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
143 signal_name="GroupStarted")
144 self.bus.add_signal_receiver(WpsFailure,
145 dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
146 signal_name="WpsFailed")
147
148
149 #Constructing all the arguements needed to connect
150 def constructArguements(self):
151 # Adding required arguements
152 self.p2p_connect_arguements = {'wps_method':self.wps_method,
153 'peer':dbus.ObjectPath(self.path+'/Peers/'+self.addr)}
154
155 # Display requires a pin, and a go intent of 15
156 if (self.wps_method == 'display'):
157 if (self.pin != None):
158 self.p2p_connect_arguements.update({'pin':self.pin})
159 else:
160 print "Error:\n Pin required for wps_method=display"
161 usage()
162 quit()
163
164 if (self.go_intent != None and int(self.go_intent) != 15):
165 print "go_intent overwritten to 15"
166
167 self.go_intent = '15'
168
169 # Keypad requires a pin, and a go intent of less than 15
170 elif (self.wps_method == 'keypad'):
171 if (self.pin != None):
172 self.p2p_connect_arguements.update({'pin':self.pin})
173 else:
174 print "Error:\n Pin required for wps_method=keypad"
175 usage()
176 quit()
177
178 if (self.go_intent != None and int(self.go_intent) == 15):
179 error = "Error :\n Group Owner intent cannot be" + \
180 " 15 for wps_method=keypad"
181 print error
182 usage()
183 quit()
184
185 # Doesn't require pin
186 # for ./wpa_cli, p2p_connect [mac] [pin#], wps_method=keypad
187 elif (self.wps_method == 'pin'):
188 if (self.pin != None):
189 print "pin ignored"
190
191 # No pin is required for pbc so it is ignored
192 elif (self.wps_method == 'pbc'):
193 if (self.pin != None):
194 print "pin ignored"
195
196 else:
197 print "Error:\n wps_method not supported or does not exist"
198 usage()
199 quit()
200
201 # Go_intent is optional for all arguements
202 if (self.go_intent != None):
203 self.p2p_connect_arguements.update(
204 {'go_intent':dbus.Int32(self.go_intent)})
205
206 # Running p2p_connect
207 def run(self):
208 try:
209 result_pin = self.p2p_interface.Connect(
210 self.p2p_connect_arguements)
211
212 except dbus.DBusException as exc:
213 raise exc
214
215 if (self.wps_method == 'pin' and \
216 not self.p2p_connect_arguements.has_key('pin') ):
217 print "Connect return with pin value of %d " % int(result_pin)
218 gobject.MainLoop().run()
219
220 if __name__ == "__main__":
221
222 # Required
223 interface_name = None
224 wps_method = None
225 addr = None
226
227 # Conditionally optional
228 pin = None
229
230 # Optional
231 wpas_dbus_interface = 'fi.w1.wpa_supplicant1'
232 go_intent = None
233
234 # Using getopts to handle options
235 try:
236 options, args = getopt.getopt(sys.argv[1:],"hi:m:a:p:g:w:")
237
238 except getopt.GetoptError:
239 usage()
240 quit()
241
242 # If theres a switch, override default option
243 for key, value in options:
244 # Help
245 if (key == "-h"):
246 usage()
247 quit()
248 # Interface Name
249 elif (key == "-i"):
250 interface_name = value
251 # WPS Method
252 elif (key == "-m"):
253 wps_method = value
254 # Address
255 elif (key == "-a"):
256 addr = value
257 # Pin
258 elif (key == "-p"):
259 pin = value
260 # Group Owner Intent
261 elif (key == "-g"):
262 go_intent = value
263 # Dbus interface
264 elif (key == "-w"):
265 wpas_dbus_interface = value
266 else:
267 assert False, "unhandled option"
268
269 # Required Arguements check
270 if (interface_name == None or wps_method == None or addr == None):
271 print "Error:\n Required arguements not specified"
272 usage()
273 quit()
274
275 # Group Owner Intent Check
276 if (go_intent != None and (int(go_intent) > 15 or int(go_intent) < 0) ):
277 print "Error:\n Group Owner Intent must be between 0 and 15 inclusive"
278 usage()
279 quit()
280
281 # Pin Check
282 if (pin != None and len(pin) != 8):
283 print "Error:\n Pin is not 8 digits"
284 usage()
285 quit()
286
287 try:
288 p2p_connect_test = P2P_Connect(interface_name,wpas_dbus_interface,
289 addr,pin,wps_method,go_intent)
290
291 except:
292 print "Error:\n Invalid Arguements"
293 usage()
294 quit()
295
296 p2p_connect_test.constructArguements()
297 p2p_connect_test.run()
298
299 os._exit(0)