]> git.ipfire.org Git - thirdparty/hostap.git/blob - wpaspy/wpaspy.py
The master branch is now used for v2.10 development
[thirdparty/hostap.git] / wpaspy / wpaspy.py
1 #!/usr/bin/python
2 #
3 # wpa_supplicant/hostapd control interface using Python
4 # Copyright (c) 2013, Jouni Malinen <j@w1.fi>
5 #
6 # This software may be distributed under the terms of the BSD license.
7 # See README for more details.
8
9 import os
10 import stat
11 import socket
12 import select
13
14 counter = 0
15
16 class Ctrl:
17 def __init__(self, path, port=9877):
18 global counter
19 self.started = False
20 self.attached = False
21 self.path = path
22 self.port = port
23
24 try:
25 mode = os.stat(path).st_mode
26 if stat.S_ISSOCK(mode):
27 self.udp = False
28 else:
29 self.udp = True
30 except:
31 self.udp = True
32
33 if not self.udp:
34 self.s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
35 self.dest = path
36 self.local = "/tmp/wpa_ctrl_" + str(os.getpid()) + '-' + str(counter)
37 counter += 1
38 self.s.bind(self.local)
39 try:
40 self.s.connect(self.dest)
41 except Exception as e:
42 self.s.close()
43 os.unlink(self.local)
44 raise
45 else:
46 try:
47 self.s = None
48 ai_list = socket.getaddrinfo(path, port, socket.AF_INET,
49 socket.SOCK_DGRAM)
50 for af, socktype, proto, cn, sockaddr in ai_list:
51 self.sockaddr = sockaddr
52 break
53 self.s = socket.socket(af, socktype)
54 self.s.settimeout(5)
55 self.s.sendto(b"GET_COOKIE", sockaddr)
56 reply, server = self.s.recvfrom(4096)
57 self.cookie = reply
58 self.port = port
59 except:
60 print("connect exception ", path, str(port))
61 if self.s != None:
62 self.s.close()
63 raise
64 self.started = True
65
66 def __del__(self):
67 self.close()
68
69 def close(self):
70 if self.attached:
71 try:
72 self.detach()
73 except Exception as e:
74 # Need to ignore this allow the socket to be closed
75 self.attached = False
76 pass
77 if self.started:
78 self.s.close()
79 if not self.udp:
80 os.unlink(self.local)
81 self.started = False
82
83 def request(self, cmd, timeout=10):
84 if type(cmd) == str:
85 try:
86 cmd2 = cmd.encode()
87 cmd = cmd2
88 except UnicodeDecodeError as e:
89 pass
90 if self.udp:
91 self.s.sendto(self.cookie + cmd, self.sockaddr)
92 else:
93 self.s.send(cmd)
94 [r, w, e] = select.select([self.s], [], [], timeout)
95 if r:
96 res = self.s.recv(4096).decode()
97 try:
98 r = str(res)
99 except UnicodeDecodeError as e:
100 r = res
101 return r
102 raise Exception("Timeout on waiting response")
103
104 def attach(self):
105 if self.attached:
106 return None
107 res = self.request("ATTACH")
108 if "OK" in res:
109 self.attached = True
110 return None
111 raise Exception("ATTACH failed")
112
113 def detach(self):
114 if not self.attached:
115 return None
116 while self.pending():
117 ev = self.recv()
118 res = self.request("DETACH")
119 if "FAIL" not in res:
120 self.attached = False
121 return None
122 raise Exception("DETACH failed")
123
124 def terminate(self):
125 if self.attached:
126 try:
127 self.detach()
128 except Exception as e:
129 # Need to ignore this to allow the socket to be closed
130 self.attached = False
131 self.request("TERMINATE")
132 self.close()
133
134 def pending(self, timeout=0):
135 [r, w, e] = select.select([self.s], [], [], timeout)
136 if r:
137 return True
138 return False
139
140 def recv(self):
141 res = self.s.recv(4096).decode()
142 try:
143 r = str(res)
144 except UnicodeDecodeError as e:
145 r = res
146 return r