--- /dev/null
+From d2f394dc4816b7bd1b44981d83509f18f19c53f0 Mon Sep 17 00:00:00 2001
+From: Parthasarathy Bhuvaragan <parthasarathy.bhuvaragan@ericsson.com>
+Date: Thu, 1 Sep 2016 16:22:16 +0200
+Subject: tipc: fix random link resets while adding a second bearer
+
+From: Parthasarathy Bhuvaragan <parthasarathy.bhuvaragan@ericsson.com>
+
+commit d2f394dc4816b7bd1b44981d83509f18f19c53f0 upstream.
+
+In a dual bearer configuration, if the second tipc link becomes
+active while the first link still has pending nametable "bulk"
+updates, it randomly leads to reset of the second link.
+
+When a link is established, the function named_distribute(),
+fills the skb based on node mtu (allows room for TUNNEL_PROTOCOL)
+with NAME_DISTRIBUTOR message for each PUBLICATION.
+However, the function named_distribute() allocates the buffer by
+increasing the node mtu by INT_H_SIZE (to insert NAME_DISTRIBUTOR).
+This consumes the space allocated for TUNNEL_PROTOCOL.
+
+When establishing the second link, the link shall tunnel all the
+messages in the first link queue including the "bulk" update.
+As size of the NAME_DISTRIBUTOR messages while tunnelling, exceeds
+the link mtu the transmission fails (-EMSGSIZE).
+
+Thus, the synch point based on the message count of the tunnel
+packets is never reached leading to link timeout.
+
+In this commit, we adjust the size of name distributor message so that
+they can be tunnelled.
+
+Reviewed-by: Jon Maloy <jon.maloy@ericsson.com>
+Signed-off-by: Parthasarathy Bhuvaragan <parthasarathy.bhuvaragan@ericsson.com>
+Signed-off-by: David S. Miller <davem@davemloft.net>
+Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
+
+---
+ net/tipc/name_distr.c | 8 +++++---
+ 1 file changed, 5 insertions(+), 3 deletions(-)
+
+--- a/net/tipc/name_distr.c
++++ b/net/tipc/name_distr.c
+@@ -62,6 +62,8 @@ static void publ_to_item(struct distr_it
+
+ /**
+ * named_prepare_buf - allocate & initialize a publication message
++ *
++ * The buffer returned is of size INT_H_SIZE + payload size
+ */
+ static struct sk_buff *named_prepare_buf(struct net *net, u32 type, u32 size,
+ u32 dest)
+@@ -166,9 +168,9 @@ static void named_distribute(struct net
+ struct publication *publ;
+ struct sk_buff *skb = NULL;
+ struct distr_item *item = NULL;
+- uint msg_dsz = (tipc_node_get_mtu(net, dnode, 0) / ITEM_SIZE) *
+- ITEM_SIZE;
+- uint msg_rem = msg_dsz;
++ u32 msg_dsz = ((tipc_node_get_mtu(net, dnode, 0) - INT_H_SIZE) /
++ ITEM_SIZE) * ITEM_SIZE;
++ u32 msg_rem = msg_dsz;
+
+ list_for_each_entry(publ, pls, local_list) {
+ /* Prepare next buffer: */
--- /dev/null
+From f1d048f24e66ba85d3dabf3d076cefa5f2b546b0 Mon Sep 17 00:00:00 2001
+From: Jon Paul Maloy <jon.maloy@ericsson.com>
+Date: Fri, 17 Jun 2016 06:35:57 -0400
+Subject: tipc: fix socket timer deadlock
+
+From: Jon Paul Maloy <jon.maloy@ericsson.com>
+
+commit f1d048f24e66ba85d3dabf3d076cefa5f2b546b0 upstream.
+
+We sometimes observe a 'deadly embrace' type deadlock occurring
+between mutually connected sockets on the same node. This happens
+when the one-hour peer supervision timers happen to expire
+simultaneously in both sockets.
+
+The scenario is as follows:
+
+CPU 1: CPU 2:
+-------- --------
+tipc_sk_timeout(sk1) tipc_sk_timeout(sk2)
+ lock(sk1.slock) lock(sk2.slock)
+ msg_create(probe) msg_create(probe)
+ unlock(sk1.slock) unlock(sk2.slock)
+ tipc_node_xmit_skb() tipc_node_xmit_skb()
+ tipc_node_xmit() tipc_node_xmit()
+ tipc_sk_rcv(sk2) tipc_sk_rcv(sk1)
+ lock(sk2.slock) lock((sk1.slock)
+ filter_rcv() filter_rcv()
+ tipc_sk_proto_rcv() tipc_sk_proto_rcv()
+ msg_create(probe_rsp) msg_create(probe_rsp)
+ tipc_sk_respond() tipc_sk_respond()
+ tipc_node_xmit_skb() tipc_node_xmit_skb()
+ tipc_node_xmit() tipc_node_xmit()
+ tipc_sk_rcv(sk1) tipc_sk_rcv(sk2)
+ lock((sk1.slock) lock((sk2.slock)
+ ===> DEADLOCK ===> DEADLOCK
+
+Further analysis reveals that there are three different locations in the
+socket code where tipc_sk_respond() is called within the context of the
+socket lock, with ensuing risk of similar deadlocks.
+
+We now solve this by passing a buffer queue along with all upcalls where
+sk_lock.slock may potentially be held. Response or rejected message
+buffers are accumulated into this queue instead of being sent out
+directly, and only sent once we know we are safely outside the slock
+context.
+
+Reported-by: GUNA <gbalasun@gmail.com>
+Acked-by: Ying Xue <ying.xue@windriver.com>
+Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
+Signed-off-by: David S. Miller <davem@davemloft.net>
+Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
+
+---
+ net/tipc/socket.c | 54 ++++++++++++++++++++++++++++++++++++++++++------------
+ 1 file changed, 42 insertions(+), 12 deletions(-)
+
+--- a/net/tipc/socket.c
++++ b/net/tipc/socket.c
+@@ -777,9 +777,11 @@ void tipc_sk_mcast_rcv(struct net *net,
+ * @tsk: receiving socket
+ * @skb: pointer to message buffer.
+ */
+-static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
++static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
++ struct sk_buff_head *xmitq)
+ {
+ struct sock *sk = &tsk->sk;
++ u32 onode = tsk_own_node(tsk);
+ struct tipc_msg *hdr = buf_msg(skb);
+ int mtyp = msg_type(hdr);
+ int conn_cong;
+@@ -792,7 +794,8 @@ static void tipc_sk_proto_rcv(struct tip
+
+ if (mtyp == CONN_PROBE) {
+ msg_set_type(hdr, CONN_PROBE_REPLY);
+- tipc_sk_respond(sk, skb, TIPC_OK);
++ if (tipc_msg_reverse(onode, &skb, TIPC_OK))
++ __skb_queue_tail(xmitq, skb);
+ return;
+ } else if (mtyp == CONN_ACK) {
+ conn_cong = tsk_conn_cong(tsk);
+@@ -1647,7 +1650,8 @@ static unsigned int rcvbuf_limit(struct
+ *
+ * Returns true if message was added to socket receive queue, otherwise false
+ */
+-static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
++static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
++ struct sk_buff_head *xmitq)
+ {
+ struct socket *sock = sk->sk_socket;
+ struct tipc_sock *tsk = tipc_sk(sk);
+@@ -1657,7 +1661,7 @@ static bool filter_rcv(struct sock *sk,
+ int usr = msg_user(hdr);
+
+ if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
+- tipc_sk_proto_rcv(tsk, skb);
++ tipc_sk_proto_rcv(tsk, skb, xmitq);
+ return false;
+ }
+
+@@ -1700,7 +1704,8 @@ static bool filter_rcv(struct sock *sk,
+ return true;
+
+ reject:
+- tipc_sk_respond(sk, skb, err);
++ if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
++ __skb_queue_tail(xmitq, skb);
+ return false;
+ }
+
+@@ -1716,9 +1721,24 @@ reject:
+ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
+ {
+ unsigned int truesize = skb->truesize;
++ struct sk_buff_head xmitq;
++ u32 dnode, selector;
+
+- if (likely(filter_rcv(sk, skb)))
++ __skb_queue_head_init(&xmitq);
++
++ if (likely(filter_rcv(sk, skb, &xmitq))) {
+ atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
++ return 0;
++ }
++
++ if (skb_queue_empty(&xmitq))
++ return 0;
++
++ /* Send response/rejected message */
++ skb = __skb_dequeue(&xmitq);
++ dnode = msg_destnode(buf_msg(skb));
++ selector = msg_origport(buf_msg(skb));
++ tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
+ return 0;
+ }
+
+@@ -1732,12 +1752,13 @@ static int tipc_backlog_rcv(struct sock
+ * Caller must hold socket lock
+ */
+ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
+- u32 dport)
++ u32 dport, struct sk_buff_head *xmitq)
+ {
++ unsigned long time_limit = jiffies + 2;
++ struct sk_buff *skb;
+ unsigned int lim;
+ atomic_t *dcnt;
+- struct sk_buff *skb;
+- unsigned long time_limit = jiffies + 2;
++ u32 onode;
+
+ while (skb_queue_len(inputq)) {
+ if (unlikely(time_after_eq(jiffies, time_limit)))
+@@ -1749,7 +1770,7 @@ static void tipc_sk_enqueue(struct sk_bu
+
+ /* Add message directly to receive queue if possible */
+ if (!sock_owned_by_user(sk)) {
+- filter_rcv(sk, skb);
++ filter_rcv(sk, skb, xmitq);
+ continue;
+ }
+
+@@ -1762,7 +1783,9 @@ static void tipc_sk_enqueue(struct sk_bu
+ continue;
+
+ /* Overload => reject message back to sender */
+- tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD);
++ onode = tipc_own_addr(sock_net(sk));
++ if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
++ __skb_queue_tail(xmitq, skb);
+ break;
+ }
+ }
+@@ -1775,12 +1798,14 @@ static void tipc_sk_enqueue(struct sk_bu
+ */
+ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
+ {
++ struct sk_buff_head xmitq;
+ u32 dnode, dport = 0;
+ int err;
+ struct tipc_sock *tsk;
+ struct sock *sk;
+ struct sk_buff *skb;
+
++ __skb_queue_head_init(&xmitq);
+ while (skb_queue_len(inputq)) {
+ dport = tipc_skb_peek_port(inputq, dport);
+ tsk = tipc_sk_lookup(net, dport);
+@@ -1788,9 +1813,14 @@ void tipc_sk_rcv(struct net *net, struct
+ if (likely(tsk)) {
+ sk = &tsk->sk;
+ if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
+- tipc_sk_enqueue(inputq, sk, dport);
++ tipc_sk_enqueue(inputq, sk, dport, &xmitq);
+ spin_unlock_bh(&sk->sk_lock.slock);
+ }
++ /* Send pending response/rejected messages, if any */
++ while ((skb = __skb_dequeue(&xmitq))) {
++ dnode = msg_destnode(buf_msg(skb));
++ tipc_node_xmit_skb(net, skb, dnode, dport);
++ }
+ sock_put(sk);
+ continue;
+ }