]> git.ipfire.org Git - thirdparty/hostap.git/blob - tests/hwsim/netlink.py
tests: Fix ap_ft_reassoc_replay for case where wlantest has the PSK
[thirdparty/hostap.git] / tests / hwsim / netlink.py
1 #
2 # (Generic) Netlink message generation/parsing
3 # Copyright (c) 2007 Johannes Berg <johannes@sipsolutions.net>
4 # Copyright (c) 2014 Intel Corporation
5 #
6 # This software may be distributed under the terms of the BSD license.
7 # See README for more details.
8
9 import struct, socket
10
11 # flags
12 NLM_F_REQUEST = 1
13 NLM_F_MULTI = 2
14 NLM_F_ACK = 4
15 NLM_F_ECHO = 8
16
17 # types
18 NLMSG_NOOP = 1
19 NLMSG_ERROR = 2
20 NLMSG_DONE = 3
21 NLMSG_OVERRUN = 4
22 NLMSG_MIN_TYPE = 0x10
23
24 class Attr(object):
25 def __init__(self, attr_type, data, *values):
26 self._type = attr_type
27 if len(values):
28 self._data = struct.pack(data, *values)
29 else:
30 self._data = data
31
32 def _dump(self):
33 hdr = struct.pack("HH", len(self._data) + 4, self._type)
34 length = len(self._data)
35 pad = ((length + 4 - 1) & ~3) - length
36 return hdr + self._data + b'\x00' * pad
37
38 def __repr__(self):
39 return '<Attr type %d, data "%s">' % (self._type, repr(self._data))
40
41 def u16(self):
42 return struct.unpack('H', self._data)[0]
43 def s16(self):
44 return struct.unpack('h', self._data)[0]
45 def u32(self):
46 return struct.unpack('I', self._data)[0]
47 def s32(self):
48 return struct.unpack('i', self._data)[0]
49 def str(self):
50 return self._data
51 def nulstr(self):
52 return self._data.split('\0')[0]
53 def nested(self):
54 return parse_attributes(self._data)
55
56 class StrAttr(Attr):
57 def __init__(self, attr_type, data):
58 Attr.__init__(self, attr_type, "%ds" % len(data), data)
59
60 class NulStrAttr(Attr):
61 def __init__(self, attr_type, data):
62 Attr.__init__(self, attr_type, "%dsB" % len(data), data, 0)
63
64 class U32Attr(Attr):
65 def __init__(self, attr_type, val):
66 Attr.__init__(self, attr_type, "I", val)
67
68 class U8Attr(Attr):
69 def __init__(self, attr_type, val):
70 Attr.__init__(self, attr_type, "B", val)
71
72 class FlagAttr(Attr):
73 def __init__(self, attr_type):
74 Attr.__init__(self, attr_type, b"")
75
76 class Nested(Attr):
77 def __init__(self, attr_type, attrs):
78 self.attrs = attrs
79 self.type = attr_type
80
81 def _dump(self):
82 contents = []
83 for attr in self.attrs:
84 contents.append(attr._dump())
85 contents = ''.join(contents)
86 length = len(contents)
87 hdr = struct.pack("HH", length+4, self.type)
88 return hdr + contents
89
90 NETLINK_ROUTE = 0
91 NETLINK_UNUSED = 1
92 NETLINK_USERSOCK = 2
93 NETLINK_FIREWALL = 3
94 NETLINK_INET_DIAG = 4
95 NETLINK_NFLOG = 5
96 NETLINK_XFRM = 6
97 NETLINK_SELINUX = 7
98 NETLINK_ISCSI = 8
99 NETLINK_AUDIT = 9
100 NETLINK_FIB_LOOKUP = 10
101 NETLINK_CONNECTOR = 11
102 NETLINK_NETFILTER = 12
103 NETLINK_IP6_FW = 13
104 NETLINK_DNRTMSG = 14
105 NETLINK_KOBJECT_UEVENT = 15
106 NETLINK_GENERIC = 16
107
108 class Message(object):
109 def __init__(self, msg_type, flags=0, seq=-1, payload=None):
110 self.type = msg_type
111 self.flags = flags
112 self.seq = seq
113 self.pid = -1
114 payload = payload or []
115 if isinstance(payload, list):
116 self.payload = bytes()
117 for attr in payload:
118 self.payload += attr._dump()
119 else:
120 self.payload = payload
121
122 def send(self, conn):
123 if self.seq == -1:
124 self.seq = conn.seq()
125
126 self.pid = conn.pid
127 length = len(self.payload)
128
129 hdr = struct.pack("IHHII", length + 4*4, self.type,
130 self.flags, self.seq, self.pid)
131 conn.send(hdr + self.payload)
132
133 def __repr__(self):
134 return '<netlink.Message type=%d, pid=%d, seq=%d, flags=0x%x "%s">' % (
135 self.type, self.pid, self.seq, self.flags, repr(self.payload))
136
137 @property
138 def ret(self):
139 assert self.type == NLMSG_ERROR
140 return struct.unpack("i", self.payload[:4])[0]
141
142 def send_and_recv(self, conn):
143 self.send(conn)
144 while True:
145 m = conn.recv()
146 if m.seq == self.seq:
147 return m
148
149 class Connection(object):
150 def __init__(self, nltype, groups=0, unexpected_msg_handler=None):
151 self.descriptor = socket.socket(socket.AF_NETLINK,
152 socket.SOCK_RAW, nltype)
153 self.descriptor.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 65536)
154 self.descriptor.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
155 self.descriptor.bind((0, groups))
156 self.pid, self.groups = self.descriptor.getsockname()
157 self._seq = 0
158 self.unexpected = unexpected_msg_handler
159 def send(self, msg):
160 self.descriptor.send(msg)
161 def recv(self):
162 contents = self.descriptor.recv(16384)
163 # XXX: python doesn't give us message flags, check
164 # len(contents) vs. msglen for TRUNC
165 msglen, msg_type, flags, seq, pid = struct.unpack("IHHII",
166 contents[:16])
167 msg = Message(msg_type, flags, seq, contents[16:])
168 msg.pid = pid
169 if msg.type == NLMSG_ERROR:
170 import os
171 errno = msg.ret
172 if errno < 0:
173 err = OSError("Netlink error: %s (%d)" % (
174 os.strerror(-errno), -errno))
175 err.errno = -errno
176 raise err
177 return msg
178 def seq(self):
179 self._seq += 1
180 return self._seq
181
182 def parse_attributes(data):
183 attrs = {}
184 while len(data):
185 attr_len, attr_type = struct.unpack("HH", data[:4])
186 attrs[attr_type] = Attr(attr_type, data[4:attr_len])
187 attr_len = ((attr_len + 4 - 1) & ~3)
188 data = data[attr_len:]
189 return attrs
190
191
192
193 CTRL_CMD_UNSPEC = 0
194 CTRL_CMD_NEWFAMILY = 1
195 CTRL_CMD_DELFAMILY = 2
196 CTRL_CMD_GETFAMILY = 3
197 CTRL_CMD_NEWOPS = 4
198 CTRL_CMD_DELOPS = 5
199 CTRL_CMD_GETOPS = 6
200
201 CTRL_ATTR_UNSPEC = 0
202 CTRL_ATTR_FAMILY_ID = 1
203 CTRL_ATTR_FAMILY_NAME = 2
204 CTRL_ATTR_VERSION = 3
205 CTRL_ATTR_HDRSIZE = 4
206 CTRL_ATTR_MAXATTR = 5
207 CTRL_ATTR_OPS = 6
208
209 class GenlHdr(object):
210 def __init__(self, cmd, version=0):
211 self.cmd = cmd
212 self.version = version
213 def _dump(self):
214 return struct.pack("BBxx", self.cmd, self.version)
215
216 def _genl_hdr_parse(data):
217 return GenlHdr(*struct.unpack("BBxx", data))
218
219 GENL_ID_CTRL = NLMSG_MIN_TYPE
220
221 class GenlMessage(Message):
222 def __init__(self, family, cmd, attrs=[], flags=0):
223 Message.__init__(self, family, flags=flags, payload=[GenlHdr(cmd)] + attrs)
224
225 class GenlController(object):
226 def __init__(self, conn):
227 self.conn = conn
228 def get_family_id(self, family):
229 a = NulStrAttr(CTRL_ATTR_FAMILY_NAME, family)
230 m = GenlMessage(GENL_ID_CTRL, CTRL_CMD_GETFAMILY, flags=NLM_F_REQUEST, attrs=[a])
231 m.send(self.conn)
232 m = self.conn.recv()
233 gh = _genl_hdr_parse(m.payload[:4])
234 attrs = parse_attributes(m.payload[4:])
235 return attrs[CTRL_ATTR_FAMILY_ID].u16()
236
237 genl_controller = GenlController(Connection(NETLINK_GENERIC))