]> git.ipfire.org Git - thirdparty/kernel/stable.git/blame - net/tipc/group.c
tipc: introduce group multicast messaging
[thirdparty/kernel/stable.git] / net / tipc / group.c
CommitLineData
75da2163
JM
1/*
2 * net/tipc/group.c: TIPC group messaging code
3 *
4 * Copyright (c) 2017, Ericsson AB
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. Neither the names of the copyright holders nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * Alternatively, this software may be distributed under the terms of the
20 * GNU General Public License ("GPL") version 2 as published by the Free
21 * Software Foundation.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 * POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#include "core.h"
37#include "addr.h"
38#include "group.h"
39#include "bcast.h"
40#include "server.h"
41#include "msg.h"
42#include "socket.h"
43#include "node.h"
44#include "name_table.h"
45#include "subscr.h"
46
47#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
48#define ADV_IDLE ADV_UNIT
b7d42635 49#define ADV_ACTIVE (ADV_UNIT * 12)
75da2163
JM
50
51enum mbr_state {
52 MBR_QUARANTINED,
53 MBR_DISCOVERED,
54 MBR_JOINING,
55 MBR_PUBLISHED,
56 MBR_JOINED,
57 MBR_LEAVING
58};
59
60struct tipc_member {
61 struct rb_node tree_node;
62 struct list_head list;
b7d42635 63 struct list_head congested;
ae236fb2 64 struct sk_buff *event_msg;
b7d42635 65 struct tipc_group *group;
75da2163
JM
66 u32 node;
67 u32 port;
31c82a2d 68 u32 instance;
75da2163 69 enum mbr_state state;
b7d42635
JM
70 u16 advertised;
71 u16 window;
75da2163 72 u16 bc_rcv_nxt;
b7d42635 73 bool usr_pending;
75da2163
JM
74};
75
76struct tipc_group {
77 struct rb_root members;
b7d42635 78 struct list_head congested;
75da2163
JM
79 struct tipc_nlist dests;
80 struct net *net;
81 int subid;
82 u32 type;
83 u32 instance;
84 u32 domain;
85 u32 scope;
86 u32 portid;
87 u16 member_cnt;
88 u16 bc_snd_nxt;
89 bool loopback;
ae236fb2 90 bool events;
75da2163
JM
91};
92
93static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
94 int mtyp, struct sk_buff_head *xmitq);
95
b7d42635
JM
96static int tipc_group_rcvbuf_limit(struct tipc_group *grp)
97{
98 int mcnt = grp->member_cnt + 1;
99
100 /* Scale to bytes, considering worst-case truesize/msgsize ratio */
101 return mcnt * ADV_ACTIVE * FLOWCTL_BLK_SZ * 4;
102}
103
75da2163
JM
104u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
105{
106 return grp->bc_snd_nxt;
107}
108
b7d42635
JM
109static bool tipc_group_is_enabled(struct tipc_member *m)
110{
111 return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING;
112}
113
75da2163
JM
114static bool tipc_group_is_receiver(struct tipc_member *m)
115{
116 return m && m->state >= MBR_JOINED;
117}
118
ee106d7f
JM
119u32 tipc_group_exclude(struct tipc_group *grp)
120{
121 if (!grp->loopback)
122 return grp->portid;
123 return 0;
124}
125
75da2163
JM
126int tipc_group_size(struct tipc_group *grp)
127{
128 return grp->member_cnt;
129}
130
131struct tipc_group *tipc_group_create(struct net *net, u32 portid,
132 struct tipc_group_req *mreq)
133{
134 struct tipc_group *grp;
135 u32 type = mreq->type;
136
137 grp = kzalloc(sizeof(*grp), GFP_ATOMIC);
138 if (!grp)
139 return NULL;
140 tipc_nlist_init(&grp->dests, tipc_own_addr(net));
b7d42635 141 INIT_LIST_HEAD(&grp->congested);
75da2163
JM
142 grp->members = RB_ROOT;
143 grp->net = net;
144 grp->portid = portid;
145 grp->domain = addr_domain(net, mreq->scope);
146 grp->type = type;
147 grp->instance = mreq->instance;
148 grp->scope = mreq->scope;
149 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
ae236fb2 150 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
75da2163
JM
151 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
152 return grp;
153 kfree(grp);
154 return NULL;
155}
156
157void tipc_group_delete(struct net *net, struct tipc_group *grp)
158{
159 struct rb_root *tree = &grp->members;
160 struct tipc_member *m, *tmp;
161 struct sk_buff_head xmitq;
162
163 __skb_queue_head_init(&xmitq);
164
165 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
166 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq);
167 list_del(&m->list);
168 kfree(m);
169 }
170 tipc_node_distr_xmit(net, &xmitq);
171 tipc_nlist_purge(&grp->dests);
172 tipc_topsrv_kern_unsubscr(net, grp->subid);
173 kfree(grp);
174}
175
176struct tipc_member *tipc_group_find_member(struct tipc_group *grp,
177 u32 node, u32 port)
178{
179 struct rb_node *n = grp->members.rb_node;
180 u64 nkey, key = (u64)node << 32 | port;
181 struct tipc_member *m;
182
183 while (n) {
184 m = container_of(n, struct tipc_member, tree_node);
185 nkey = (u64)m->node << 32 | m->port;
186 if (key < nkey)
187 n = n->rb_left;
188 else if (key > nkey)
189 n = n->rb_right;
190 else
191 return m;
192 }
193 return NULL;
194}
195
27bd9ec0
JM
196static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp,
197 u32 node, u32 port)
198{
199 struct tipc_member *m;
200
201 m = tipc_group_find_member(grp, node, port);
202 if (m && tipc_group_is_enabled(m))
203 return m;
204 return NULL;
205}
206
75da2163
JM
207static struct tipc_member *tipc_group_find_node(struct tipc_group *grp,
208 u32 node)
209{
210 struct tipc_member *m;
211 struct rb_node *n;
212
213 for (n = rb_first(&grp->members); n; n = rb_next(n)) {
214 m = container_of(n, struct tipc_member, tree_node);
215 if (m->node == node)
216 return m;
217 }
218 return NULL;
219}
220
221static void tipc_group_add_to_tree(struct tipc_group *grp,
222 struct tipc_member *m)
223{
224 u64 nkey, key = (u64)m->node << 32 | m->port;
225 struct rb_node **n, *parent = NULL;
226 struct tipc_member *tmp;
227
228 n = &grp->members.rb_node;
229 while (*n) {
230 tmp = container_of(*n, struct tipc_member, tree_node);
231 parent = *n;
232 tmp = container_of(parent, struct tipc_member, tree_node);
233 nkey = (u64)tmp->node << 32 | tmp->port;
234 if (key < nkey)
235 n = &(*n)->rb_left;
236 else if (key > nkey)
237 n = &(*n)->rb_right;
238 else
239 return;
240 }
241 rb_link_node(&m->tree_node, parent, n);
242 rb_insert_color(&m->tree_node, &grp->members);
243}
244
245static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
246 u32 node, u32 port,
247 int state)
248{
249 struct tipc_member *m;
250
251 m = kzalloc(sizeof(*m), GFP_ATOMIC);
252 if (!m)
253 return NULL;
254 INIT_LIST_HEAD(&m->list);
b7d42635
JM
255 INIT_LIST_HEAD(&m->congested);
256 m->group = grp;
75da2163
JM
257 m->node = node;
258 m->port = port;
259 grp->member_cnt++;
260 tipc_group_add_to_tree(grp, m);
261 tipc_nlist_add(&grp->dests, m->node);
262 m->state = state;
263 return m;
264}
265
266void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port)
267{
268 tipc_group_create_member(grp, node, port, MBR_DISCOVERED);
269}
270
271static void tipc_group_delete_member(struct tipc_group *grp,
272 struct tipc_member *m)
273{
274 rb_erase(&m->tree_node, &grp->members);
275 grp->member_cnt--;
276 list_del_init(&m->list);
b7d42635 277 list_del_init(&m->congested);
75da2163
JM
278
279 /* If last member on a node, remove node from dest list */
280 if (!tipc_group_find_node(grp, m->node))
281 tipc_nlist_del(&grp->dests, m->node);
282
283 kfree(m);
284}
285
286struct tipc_nlist *tipc_group_dests(struct tipc_group *grp)
287{
288 return &grp->dests;
289}
290
291void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
292 int *scope)
293{
294 seq->type = grp->type;
295 seq->lower = grp->instance;
296 seq->upper = grp->instance;
297 *scope = grp->scope;
298}
299
b7d42635
JM
300void tipc_group_update_member(struct tipc_member *m, int len)
301{
302 struct tipc_group *grp = m->group;
303 struct tipc_member *_m, *tmp;
304
305 if (!tipc_group_is_enabled(m))
306 return;
307
308 m->window -= len;
309
310 if (m->window >= ADV_IDLE)
311 return;
312
313 if (!list_empty(&m->congested))
314 return;
315
316 /* Sort member into congested members' list */
317 list_for_each_entry_safe(_m, tmp, &grp->congested, congested) {
318 if (m->window > _m->window)
319 continue;
320 list_add_tail(&m->congested, &_m->congested);
321 return;
322 }
323 list_add_tail(&m->congested, &grp->congested);
324}
325
326void tipc_group_update_bc_members(struct tipc_group *grp, int len)
75da2163 327{
b7d42635
JM
328 struct tipc_member *m;
329 struct rb_node *n;
330
331 for (n = rb_first(&grp->members); n; n = rb_next(n)) {
332 m = container_of(n, struct tipc_member, tree_node);
333 if (tipc_group_is_enabled(m))
334 tipc_group_update_member(m, len);
335 }
75da2163
JM
336 grp->bc_snd_nxt++;
337}
338
27bd9ec0
JM
339bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
340 int len, struct tipc_member **mbr)
b7d42635 341{
27bd9ec0 342 struct sk_buff_head xmitq;
b7d42635 343 struct tipc_member *m;
27bd9ec0
JM
344 int adv, state;
345
346 m = tipc_group_find_dest(grp, dnode, dport);
347 *mbr = m;
348 if (!m)
349 return false;
350 if (m->usr_pending)
351 return true;
352 if (m->window >= len)
353 return false;
354 m->usr_pending = true;
355
356 /* If not fully advertised, do it now to prevent mutual blocking */
357 adv = m->advertised;
358 state = m->state;
359 if (state < MBR_JOINED)
360 return true;
361 if (state == MBR_JOINED && adv == ADV_IDLE)
362 return true;
363 skb_queue_head_init(&xmitq);
364 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
365 tipc_node_distr_xmit(grp->net, &xmitq);
366 return true;
367}
368
369bool tipc_group_bc_cong(struct tipc_group *grp, int len)
370{
371 struct tipc_member *m = NULL;
b7d42635
JM
372
373 if (list_empty(&grp->congested))
374 return false;
375
376 m = list_first_entry(&grp->congested, struct tipc_member, congested);
377 if (m->window >= len)
378 return false;
379
27bd9ec0 380 return tipc_group_cong(grp, m->node, m->port, len, &m);
b7d42635
JM
381}
382
75da2163
JM
383/* tipc_group_filter_msg() - determine if we should accept arriving message
384 */
385void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
386 struct sk_buff_head *xmitq)
387{
388 struct sk_buff *skb = __skb_dequeue(inputq);
389 struct tipc_member *m;
390 struct tipc_msg *hdr;
391 u32 node, port;
392 int mtyp;
393
394 if (!skb)
395 return;
396
397 hdr = buf_msg(skb);
398 mtyp = msg_type(hdr);
399 node = msg_orignode(hdr);
400 port = msg_origport(hdr);
401
402 if (!msg_in_group(hdr))
403 goto drop;
404
ae236fb2
JM
405 if (mtyp == TIPC_GRP_MEMBER_EVT) {
406 if (!grp->events)
407 goto drop;
408 __skb_queue_tail(inputq, skb);
409 return;
410 }
411
75da2163
JM
412 m = tipc_group_find_member(grp, node, port);
413 if (!tipc_group_is_receiver(m))
414 goto drop;
415
5b8dddb6
JM
416 m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
417
418 /* Drop multicast here if not for this member */
419 if (mtyp == TIPC_GRP_MCAST_MSG) {
420 if (msg_nameinst(hdr) != grp->instance) {
421 m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
422 tipc_group_update_rcv_win(grp, msg_blocks(hdr),
423 node, port, xmitq);
424 kfree_skb(skb);
425 return;
426 }
427 }
428
31c82a2d 429 TIPC_SKB_CB(skb)->orig_member = m->instance;
75da2163
JM
430 __skb_queue_tail(inputq, skb);
431
75da2163
JM
432 return;
433drop:
434 kfree_skb(skb);
435}
436
b7d42635
JM
437void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
438 u32 port, struct sk_buff_head *xmitq)
439{
440 struct tipc_member *m;
441
442 m = tipc_group_find_member(grp, node, port);
443 if (!m)
444 return;
445
446 m->advertised -= blks;
447
448 switch (m->state) {
449 case MBR_JOINED:
450 if (m->advertised <= (ADV_ACTIVE - ADV_UNIT))
451 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
452 break;
453 case MBR_DISCOVERED:
454 case MBR_JOINING:
455 case MBR_LEAVING:
456 default:
457 break;
458 }
459}
460
75da2163
JM
461static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
462 int mtyp, struct sk_buff_head *xmitq)
463{
464 struct tipc_msg *hdr;
465 struct sk_buff *skb;
b7d42635 466 int adv = 0;
75da2163
JM
467
468 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
469 m->node, tipc_own_addr(grp->net),
470 m->port, grp->portid, 0);
471 if (!skb)
472 return;
473
b7d42635
JM
474 if (m->state == MBR_JOINED)
475 adv = ADV_ACTIVE - m->advertised;
476
75da2163 477 hdr = buf_msg(skb);
b7d42635
JM
478
479 if (mtyp == GRP_JOIN_MSG) {
75da2163 480 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
b7d42635
JM
481 msg_set_adv_win(hdr, adv);
482 m->advertised += adv;
483 } else if (mtyp == GRP_ADV_MSG) {
484 msg_set_adv_win(hdr, adv);
485 m->advertised += adv;
486 }
75da2163
JM
487 __skb_queue_tail(xmitq, skb);
488}
489
b7d42635
JM
490void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
491 struct tipc_msg *hdr, struct sk_buff_head *inputq,
75da2163
JM
492 struct sk_buff_head *xmitq)
493{
494 u32 node = msg_orignode(hdr);
495 u32 port = msg_origport(hdr);
496 struct tipc_member *m;
497
498 if (!grp)
499 return;
500
501 m = tipc_group_find_member(grp, node, port);
502
503 switch (msg_type(hdr)) {
504 case GRP_JOIN_MSG:
505 if (!m)
506 m = tipc_group_create_member(grp, node, port,
507 MBR_QUARANTINED);
508 if (!m)
509 return;
510 m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
b7d42635 511 m->window += msg_adv_win(hdr);
75da2163
JM
512
513 /* Wait until PUBLISH event is received */
ae236fb2 514 if (m->state == MBR_DISCOVERED) {
75da2163 515 m->state = MBR_JOINING;
ae236fb2 516 } else if (m->state == MBR_PUBLISHED) {
75da2163 517 m->state = MBR_JOINED;
b7d42635
JM
518 *usr_wakeup = true;
519 m->usr_pending = false;
520 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
ae236fb2
JM
521 __skb_queue_tail(inputq, m->event_msg);
522 }
b7d42635
JM
523 if (m->window < ADV_IDLE)
524 tipc_group_update_member(m, 0);
525 else
526 list_del_init(&m->congested);
75da2163
JM
527 return;
528 case GRP_LEAVE_MSG:
529 if (!m)
530 return;
531
532 /* Wait until WITHDRAW event is received */
533 if (m->state != MBR_LEAVING) {
534 m->state = MBR_LEAVING;
535 return;
536 }
537 /* Otherwise deliver already received WITHDRAW event */
ae236fb2 538 __skb_queue_tail(inputq, m->event_msg);
b7d42635 539 *usr_wakeup = m->usr_pending;
75da2163 540 tipc_group_delete_member(grp, m);
b7d42635
JM
541 list_del_init(&m->congested);
542 return;
543 case GRP_ADV_MSG:
544 if (!m)
545 return;
546 m->window += msg_adv_win(hdr);
547 *usr_wakeup = m->usr_pending;
548 m->usr_pending = false;
549 list_del_init(&m->congested);
75da2163
JM
550 return;
551 default:
552 pr_warn("Received unknown GROUP_PROTO message\n");
553 }
554}
555
b7d42635
JM
556/* tipc_group_member_evt() - receive and handle a member up/down event
557 */
75da2163 558void tipc_group_member_evt(struct tipc_group *grp,
b7d42635
JM
559 bool *usr_wakeup,
560 int *sk_rcvbuf,
75da2163 561 struct sk_buff *skb,
ae236fb2 562 struct sk_buff_head *inputq,
75da2163
JM
563 struct sk_buff_head *xmitq)
564{
565 struct tipc_msg *hdr = buf_msg(skb);
566 struct tipc_event *evt = (void *)msg_data(hdr);
ae236fb2 567 u32 instance = evt->found_lower;
75da2163
JM
568 u32 node = evt->port.node;
569 u32 port = evt->port.ref;
ae236fb2 570 int event = evt->event;
75da2163
JM
571 struct tipc_member *m;
572 struct net *net;
573 u32 self;
574
575 if (!grp)
576 goto drop;
577
578 net = grp->net;
579 self = tipc_own_addr(net);
580 if (!grp->loopback && node == self && port == grp->portid)
581 goto drop;
582
ae236fb2
JM
583 /* Convert message before delivery to user */
584 msg_set_hdr_sz(hdr, GROUP_H_SIZE);
585 msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
586 msg_set_type(hdr, TIPC_GRP_MEMBER_EVT);
587 msg_set_origport(hdr, port);
588 msg_set_orignode(hdr, node);
589 msg_set_nametype(hdr, grp->type);
590 msg_set_grp_evt(hdr, event);
591
75da2163
JM
592 m = tipc_group_find_member(grp, node, port);
593
ae236fb2 594 if (event == TIPC_PUBLISHED) {
75da2163
JM
595 if (!m)
596 m = tipc_group_create_member(grp, node, port,
597 MBR_DISCOVERED);
598 if (!m)
599 goto drop;
600
ae236fb2
JM
601 /* Hold back event if JOIN message not yet received */
602 if (m->state == MBR_DISCOVERED) {
603 m->event_msg = skb;
75da2163 604 m->state = MBR_PUBLISHED;
ae236fb2
JM
605 } else {
606 __skb_queue_tail(inputq, skb);
75da2163 607 m->state = MBR_JOINED;
b7d42635
JM
608 *usr_wakeup = true;
609 m->usr_pending = false;
ae236fb2
JM
610 }
611 m->instance = instance;
612 TIPC_SKB_CB(skb)->orig_member = m->instance;
75da2163 613 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
b7d42635
JM
614 if (m->window < ADV_IDLE)
615 tipc_group_update_member(m, 0);
616 else
617 list_del_init(&m->congested);
ae236fb2 618 } else if (event == TIPC_WITHDRAWN) {
75da2163
JM
619 if (!m)
620 goto drop;
621
ae236fb2
JM
622 TIPC_SKB_CB(skb)->orig_member = m->instance;
623
b7d42635
JM
624 *usr_wakeup = m->usr_pending;
625 m->usr_pending = false;
626
ae236fb2
JM
627 /* Hold back event if more messages might be expected */
628 if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {
629 m->event_msg = skb;
75da2163 630 m->state = MBR_LEAVING;
ae236fb2
JM
631 } else {
632 __skb_queue_tail(inputq, skb);
75da2163 633 tipc_group_delete_member(grp, m);
ae236fb2 634 }
b7d42635 635 list_del_init(&m->congested);
75da2163 636 }
b7d42635 637 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
ae236fb2 638 return;
75da2163
JM
639drop:
640 kfree_skb(skb);
641}