]> git.ipfire.org Git - thirdparty/kernel/stable.git/blame - net/tipc/group.c
tipc: introduce communication groups
[thirdparty/kernel/stable.git] / net / tipc / group.c
CommitLineData
75da2163
JM
1/*
2 * net/tipc/group.c: TIPC group messaging code
3 *
4 * Copyright (c) 2017, Ericsson AB
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. Neither the names of the copyright holders nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * Alternatively, this software may be distributed under the terms of the
20 * GNU General Public License ("GPL") version 2 as published by the Free
21 * Software Foundation.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 * POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#include "core.h"
37#include "addr.h"
38#include "group.h"
39#include "bcast.h"
40#include "server.h"
41#include "msg.h"
42#include "socket.h"
43#include "node.h"
44#include "name_table.h"
45#include "subscr.h"
46
47#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
48#define ADV_IDLE ADV_UNIT
49
50enum mbr_state {
51 MBR_QUARANTINED,
52 MBR_DISCOVERED,
53 MBR_JOINING,
54 MBR_PUBLISHED,
55 MBR_JOINED,
56 MBR_LEAVING
57};
58
59struct tipc_member {
60 struct rb_node tree_node;
61 struct list_head list;
62 u32 node;
63 u32 port;
64 enum mbr_state state;
65 u16 bc_rcv_nxt;
66};
67
68struct tipc_group {
69 struct rb_root members;
70 struct tipc_nlist dests;
71 struct net *net;
72 int subid;
73 u32 type;
74 u32 instance;
75 u32 domain;
76 u32 scope;
77 u32 portid;
78 u16 member_cnt;
79 u16 bc_snd_nxt;
80 bool loopback;
81};
82
83static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
84 int mtyp, struct sk_buff_head *xmitq);
85
86u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
87{
88 return grp->bc_snd_nxt;
89}
90
91static bool tipc_group_is_receiver(struct tipc_member *m)
92{
93 return m && m->state >= MBR_JOINED;
94}
95
96int tipc_group_size(struct tipc_group *grp)
97{
98 return grp->member_cnt;
99}
100
101struct tipc_group *tipc_group_create(struct net *net, u32 portid,
102 struct tipc_group_req *mreq)
103{
104 struct tipc_group *grp;
105 u32 type = mreq->type;
106
107 grp = kzalloc(sizeof(*grp), GFP_ATOMIC);
108 if (!grp)
109 return NULL;
110 tipc_nlist_init(&grp->dests, tipc_own_addr(net));
111 grp->members = RB_ROOT;
112 grp->net = net;
113 grp->portid = portid;
114 grp->domain = addr_domain(net, mreq->scope);
115 grp->type = type;
116 grp->instance = mreq->instance;
117 grp->scope = mreq->scope;
118 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
119 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
120 return grp;
121 kfree(grp);
122 return NULL;
123}
124
125void tipc_group_delete(struct net *net, struct tipc_group *grp)
126{
127 struct rb_root *tree = &grp->members;
128 struct tipc_member *m, *tmp;
129 struct sk_buff_head xmitq;
130
131 __skb_queue_head_init(&xmitq);
132
133 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
134 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq);
135 list_del(&m->list);
136 kfree(m);
137 }
138 tipc_node_distr_xmit(net, &xmitq);
139 tipc_nlist_purge(&grp->dests);
140 tipc_topsrv_kern_unsubscr(net, grp->subid);
141 kfree(grp);
142}
143
144struct tipc_member *tipc_group_find_member(struct tipc_group *grp,
145 u32 node, u32 port)
146{
147 struct rb_node *n = grp->members.rb_node;
148 u64 nkey, key = (u64)node << 32 | port;
149 struct tipc_member *m;
150
151 while (n) {
152 m = container_of(n, struct tipc_member, tree_node);
153 nkey = (u64)m->node << 32 | m->port;
154 if (key < nkey)
155 n = n->rb_left;
156 else if (key > nkey)
157 n = n->rb_right;
158 else
159 return m;
160 }
161 return NULL;
162}
163
164static struct tipc_member *tipc_group_find_node(struct tipc_group *grp,
165 u32 node)
166{
167 struct tipc_member *m;
168 struct rb_node *n;
169
170 for (n = rb_first(&grp->members); n; n = rb_next(n)) {
171 m = container_of(n, struct tipc_member, tree_node);
172 if (m->node == node)
173 return m;
174 }
175 return NULL;
176}
177
178static void tipc_group_add_to_tree(struct tipc_group *grp,
179 struct tipc_member *m)
180{
181 u64 nkey, key = (u64)m->node << 32 | m->port;
182 struct rb_node **n, *parent = NULL;
183 struct tipc_member *tmp;
184
185 n = &grp->members.rb_node;
186 while (*n) {
187 tmp = container_of(*n, struct tipc_member, tree_node);
188 parent = *n;
189 tmp = container_of(parent, struct tipc_member, tree_node);
190 nkey = (u64)tmp->node << 32 | tmp->port;
191 if (key < nkey)
192 n = &(*n)->rb_left;
193 else if (key > nkey)
194 n = &(*n)->rb_right;
195 else
196 return;
197 }
198 rb_link_node(&m->tree_node, parent, n);
199 rb_insert_color(&m->tree_node, &grp->members);
200}
201
202static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
203 u32 node, u32 port,
204 int state)
205{
206 struct tipc_member *m;
207
208 m = kzalloc(sizeof(*m), GFP_ATOMIC);
209 if (!m)
210 return NULL;
211 INIT_LIST_HEAD(&m->list);
212 m->node = node;
213 m->port = port;
214 grp->member_cnt++;
215 tipc_group_add_to_tree(grp, m);
216 tipc_nlist_add(&grp->dests, m->node);
217 m->state = state;
218 return m;
219}
220
221void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port)
222{
223 tipc_group_create_member(grp, node, port, MBR_DISCOVERED);
224}
225
226static void tipc_group_delete_member(struct tipc_group *grp,
227 struct tipc_member *m)
228{
229 rb_erase(&m->tree_node, &grp->members);
230 grp->member_cnt--;
231 list_del_init(&m->list);
232
233 /* If last member on a node, remove node from dest list */
234 if (!tipc_group_find_node(grp, m->node))
235 tipc_nlist_del(&grp->dests, m->node);
236
237 kfree(m);
238}
239
240struct tipc_nlist *tipc_group_dests(struct tipc_group *grp)
241{
242 return &grp->dests;
243}
244
245void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
246 int *scope)
247{
248 seq->type = grp->type;
249 seq->lower = grp->instance;
250 seq->upper = grp->instance;
251 *scope = grp->scope;
252}
253
254void tipc_group_update_bc_members(struct tipc_group *grp)
255{
256 grp->bc_snd_nxt++;
257}
258
259/* tipc_group_filter_msg() - determine if we should accept arriving message
260 */
261void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
262 struct sk_buff_head *xmitq)
263{
264 struct sk_buff *skb = __skb_dequeue(inputq);
265 struct tipc_member *m;
266 struct tipc_msg *hdr;
267 u32 node, port;
268 int mtyp;
269
270 if (!skb)
271 return;
272
273 hdr = buf_msg(skb);
274 mtyp = msg_type(hdr);
275 node = msg_orignode(hdr);
276 port = msg_origport(hdr);
277
278 if (!msg_in_group(hdr))
279 goto drop;
280
281 m = tipc_group_find_member(grp, node, port);
282 if (!tipc_group_is_receiver(m))
283 goto drop;
284
285 __skb_queue_tail(inputq, skb);
286
287 m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
288 return;
289drop:
290 kfree_skb(skb);
291}
292
293static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
294 int mtyp, struct sk_buff_head *xmitq)
295{
296 struct tipc_msg *hdr;
297 struct sk_buff *skb;
298
299 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
300 m->node, tipc_own_addr(grp->net),
301 m->port, grp->portid, 0);
302 if (!skb)
303 return;
304
305 hdr = buf_msg(skb);
306 if (mtyp == GRP_JOIN_MSG)
307 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
308 __skb_queue_tail(xmitq, skb);
309}
310
311void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
312 struct sk_buff_head *xmitq)
313{
314 u32 node = msg_orignode(hdr);
315 u32 port = msg_origport(hdr);
316 struct tipc_member *m;
317
318 if (!grp)
319 return;
320
321 m = tipc_group_find_member(grp, node, port);
322
323 switch (msg_type(hdr)) {
324 case GRP_JOIN_MSG:
325 if (!m)
326 m = tipc_group_create_member(grp, node, port,
327 MBR_QUARANTINED);
328 if (!m)
329 return;
330 m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
331
332 /* Wait until PUBLISH event is received */
333 if (m->state == MBR_DISCOVERED)
334 m->state = MBR_JOINING;
335 else if (m->state == MBR_PUBLISHED)
336 m->state = MBR_JOINED;
337 return;
338 case GRP_LEAVE_MSG:
339 if (!m)
340 return;
341
342 /* Wait until WITHDRAW event is received */
343 if (m->state != MBR_LEAVING) {
344 m->state = MBR_LEAVING;
345 return;
346 }
347 /* Otherwise deliver already received WITHDRAW event */
348 tipc_group_delete_member(grp, m);
349 return;
350 default:
351 pr_warn("Received unknown GROUP_PROTO message\n");
352 }
353}
354
355/* tipc_group_member_evt() - receive and handle a member up/down event
356 */
357void tipc_group_member_evt(struct tipc_group *grp,
358 struct sk_buff *skb,
359 struct sk_buff_head *xmitq)
360{
361 struct tipc_msg *hdr = buf_msg(skb);
362 struct tipc_event *evt = (void *)msg_data(hdr);
363 u32 node = evt->port.node;
364 u32 port = evt->port.ref;
365 struct tipc_member *m;
366 struct net *net;
367 u32 self;
368
369 if (!grp)
370 goto drop;
371
372 net = grp->net;
373 self = tipc_own_addr(net);
374 if (!grp->loopback && node == self && port == grp->portid)
375 goto drop;
376
377 m = tipc_group_find_member(grp, node, port);
378
379 if (evt->event == TIPC_PUBLISHED) {
380 if (!m)
381 m = tipc_group_create_member(grp, node, port,
382 MBR_DISCOVERED);
383 if (!m)
384 goto drop;
385
386 /* Wait if JOIN message not yet received */
387 if (m->state == MBR_DISCOVERED)
388 m->state = MBR_PUBLISHED;
389 else
390 m->state = MBR_JOINED;
391 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
392 } else if (evt->event == TIPC_WITHDRAWN) {
393 if (!m)
394 goto drop;
395
396 /* Keep back event if more messages might be expected */
397 if (m->state != MBR_LEAVING && tipc_node_is_up(net, node))
398 m->state = MBR_LEAVING;
399 else
400 tipc_group_delete_member(grp, m);
401 }
402drop:
403 kfree_skb(skb);
404}