]> git.ipfire.org Git - thirdparty/hostap.git/blob - wpa_supplicant/examples/p2p/p2p_flush.py
tests: Use python3 compatible "except" statement
[thirdparty/hostap.git] / wpa_supplicant / examples / p2p / p2p_flush.py
1 #!/usr/bin/python
2 # Tests P2P_Flush
3 # Will flush the p2p interface
4 # Then Program will exit
5 ######### MAY NEED TO RUN AS SUDO #############
6
7 import dbus
8 import sys, os
9 import time
10 import gobject
11 import threading
12 import getopt
13 from dbus.mainloop.glib import DBusGMainLoop
14
15 def usage():
16 print "Usage:"
17 print " %s -i <interface_name> \ " \
18 % sys.argv[0]
19 print " [-w <wpas_dbus_interface>]"
20 print "Options:"
21 print " -i = interface name"
22 print " -w = wpas dbus interface = fi.w1.wpa_supplicant1"
23 print "Example:"
24 print " %s -i wlan0" % sys.argv[0]
25
26 # Required Signals\
27 def deviceLost(devicepath):
28 print "Device lost: %s" % (devicepath)
29
30 class P2P_Flush (threading.Thread):
31 # Needed Variables
32 global bus
33 global wpas_object
34 global interface_object
35 global p2p_interface
36 global interface_name
37 global wpas
38 global wpas_dbus_interface
39 global path
40 global timeout
41
42 # Dbus Paths
43 global wpas_dbus_opath
44 global wpas_dbus_interfaces_opath
45 global wpas_dbus_interfaces_interface
46 global wpas_dbus_interfaces_p2pdevice
47
48 # Constructor
49 def __init__(self,interface_name,wpas_dbus_interface,timeout):
50 # Initializes variables and threads
51 self.interface_name = interface_name
52 self.wpas_dbus_interface = wpas_dbus_interface
53 self.timeout = timeout
54
55 # Initializes thread and daemon allows for ctrl-c kill
56 threading.Thread.__init__(self)
57 self.daemon = True
58
59 # Generating interface/object paths
60 self.wpas_dbus_opath = "/" + \
61 self.wpas_dbus_interface.replace(".","/")
62 self.wpas_wpas_dbus_interfaces_opath = self.wpas_dbus_opath + \
63 "/Interfaces"
64 self.wpas_dbus_interfaces_interface = \
65 self.wpas_dbus_interface + ".Interface"
66 self.wpas_dbus_interfaces_p2pdevice = \
67 self.wpas_dbus_interfaces_interface \
68 + ".P2PDevice"
69
70 # Getting interfaces and objects
71 DBusGMainLoop(set_as_default=True)
72 self.bus = dbus.SystemBus()
73 self.wpas_object = self.bus.get_object(
74 self.wpas_dbus_interface,
75 self.wpas_dbus_opath)
76 self.wpas = dbus.Interface(self.wpas_object,
77 self.wpas_dbus_interface)
78
79 # Try to see if supplicant knows about interface
80 # If not, throw an exception
81 try:
82 self.path = self.wpas.GetInterface(
83 self.interface_name)
84 except dbus.DBusException as exc:
85 error = 'Error:\n Interface ' + self.interface_name \
86 + ' was not found'
87 print error
88 usage()
89 os._exit(0)
90
91 self.interface_object = self.bus.get_object(
92 self.wpas_dbus_interface, self.path)
93 self.p2p_interface = dbus.Interface(self.interface_object,
94 self.wpas_dbus_interfaces_p2pdevice)
95
96 # Signals
97 self.bus.add_signal_receiver(deviceLost,
98 dbus_interface=self.wpas_dbus_interfaces_p2pdevice,
99 signal_name="DeviceLost")
100
101 # Runs p2p_flush
102 def run(self):
103 # Allows other threads to keep working while MainLoop runs
104 # Required for timeout implementation
105 gobject.MainLoop().get_context().iteration(True)
106 gobject.threads_init()
107 self.p2p_interface.Flush()
108 gobject.MainLoop().run()
109
110
111 if __name__ == "__main__":
112 # Needed to show which devices were lost
113 timeout = 5
114 # Defaults for optional inputs
115 wpas_dbus_interface = 'fi.w1.wpa_supplicant1'
116
117 # interface_name is required
118 interface_name = None
119
120 # Using getopts to handle options
121 try:
122 options, args = getopt.getopt(sys.argv[1:],"hi:w:")
123
124 except getopt.GetoptError:
125 usage()
126 quit()
127
128 # If theres a switch, override default option
129 for key, value in options:
130 # Help
131 if (key == "-h"):
132 usage()
133 quit()
134 # Interface Name
135 elif (key == "-i"):
136 interface_name = value
137 # Dbus interface
138 elif (key == "-w"):
139 wpas_dbus_interface = value
140 else:
141 assert False, "unhandled option"
142
143 # Interface name is required and was not given
144 if (interface_name == None):
145 print "Error:\n interface_name is required"
146 usage()
147 quit()
148
149 # Constructor
150 try:
151 p2p_flush_test = P2P_Flush(interface_name, wpas_dbus_interface,timeout)
152
153 except:
154 print "Error:\n Invalid wpas_dbus_interface"
155 usage()
156 quit()
157
158 # Start P2P_Find
159 p2p_flush_test.start()
160
161 try:
162 time.sleep(int(p2p_flush_test.timeout))
163
164 except:
165 pass
166
167 print "p2p_flush complete"
168 quit()