]> git.ipfire.org Git - people/ms/linux.git/blame - fs/dlm/lowcomms.c
fs: dlm: add midcomms debugfs functionality
[people/ms/linux.git] / fs / dlm / lowcomms.c
CommitLineData
2522fe45 1// SPDX-License-Identifier: GPL-2.0-only
fdda387f
PC
2/******************************************************************************
3*******************************************************************************
4**
5** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5e9ccc37 6** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
fdda387f 7**
fdda387f
PC
8**
9*******************************************************************************
10******************************************************************************/
11
12/*
13 * lowcomms.c
14 *
15 * This is the "low-level" comms layer.
16 *
17 * It is responsible for sending/receiving messages
18 * from other nodes in the cluster.
19 *
20 * Cluster nodes are referred to by their nodeids. nodeids are
21 * simply 32 bit numbers to the locking module - if they need to
2cf12c0b 22 * be expanded for the cluster infrastructure then that is its
fdda387f
PC
23 * responsibility. It is this layer's
24 * responsibility to resolve these into IP address or
25 * whatever it needs for inter-node communication.
26 *
27 * The comms level is two kernel threads that deal mainly with
28 * the receiving of messages from other nodes and passing them
29 * up to the mid-level comms layer (which understands the
30 * message format) for execution by the locking core, and
31 * a send thread which does all the setting up of connections
32 * to remote nodes and the sending of data. Threads are not allowed
33 * to send their own data because it may cause them to wait in times
34 * of high load. Also, this way, the sending thread can collect together
35 * messages bound for one node and send them in one block.
36 *
2cf12c0b 37 * lowcomms will choose to use either TCP or SCTP as its transport layer
6ed7257b 38 * depending on the configuration variable 'protocol'. This should be set
2cf12c0b 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
6ed7257b
PC
40 * cluster-wide mechanism as it must be the same on all nodes of the cluster
41 * for the DLM to function.
fdda387f
PC
42 *
43 */
44
fdda387f
PC
45#include <asm/ioctls.h>
46#include <net/sock.h>
47#include <net/tcp.h>
48#include <linux/pagemap.h>
6ed7257b 49#include <linux/file.h>
7a936ce7 50#include <linux/mutex.h>
6ed7257b 51#include <linux/sctp.h>
5a0e3ad6 52#include <linux/slab.h>
2f2d76cc 53#include <net/sctp/sctp.h>
44ad532b 54#include <net/ipv6.h>
fdda387f
PC
55
56#include "dlm_internal.h"
57#include "lowcomms.h"
58#include "midcomms.h"
59#include "config.h"
60
6ed7257b
PC
61#define NEEDED_RMEM (4*1024*1024)
62
f92c8dd7
BP
63/* Number of messages to send before rescheduling */
64#define MAX_SEND_MSG_COUNT 25
055923bf 65#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
f92c8dd7 66
fdda387f
PC
67struct connection {
68 struct socket *sock; /* NULL if not connected */
69 uint32_t nodeid; /* So we know who we are in the list */
f1f1c1cc 70 struct mutex sock_mutex;
6ed7257b 71 unsigned long flags;
fdda387f 72#define CF_READ_PENDING 1
8a4abb08 73#define CF_WRITE_PENDING 2
6ed7257b
PC
74#define CF_INIT_PENDING 4
75#define CF_IS_OTHERCON 5
063c4c99 76#define CF_CLOSE 6
b36930dd 77#define CF_APP_LIMITED 7
b2a66629 78#define CF_CLOSING 8
055923bf 79#define CF_SHUTDOWN 9
19633c7e 80#define CF_CONNECTED 10
ba868d9d
AA
81#define CF_RECONNECT 11
82#define CF_DELAY_CONNECT 12
8aa31cbf 83#define CF_EOF 13
ac33d071 84 struct list_head writequeue; /* List of outgoing writequeue_entries */
fdda387f 85 spinlock_t writequeue_lock;
8aa31cbf 86 atomic_t writequeue_cnt;
6ed7257b 87 void (*connect_action) (struct connection *); /* What to do to connect */
055923bf 88 void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
8aa31cbf 89 bool (*eof_condition)(struct connection *con); /* What to do to eof check */
fdda387f 90 int retries;
fdda387f 91#define MAX_CONNECT_RETRIES 3
5e9ccc37 92 struct hlist_node list;
fdda387f 93 struct connection *othercon;
ba868d9d 94 struct connection *sendcon;
1d6e8131
PC
95 struct work_struct rwork; /* Receive workqueue */
96 struct work_struct swork; /* Send workqueue */
055923bf 97 wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
4798cbbf
AA
98 unsigned char *rx_buf;
99 int rx_buflen;
100 int rx_leftover;
a47666eb 101 struct rcu_head rcu;
fdda387f
PC
102};
103#define sock2con(x) ((struct connection *)(x)->sk_user_data)
104
d11ccd45
AA
105struct listen_connection {
106 struct socket *sock;
107 struct work_struct rwork;
108};
109
f0747ebf
AA
110#define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
111#define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
112
fdda387f
PC
113/* An entry waiting to be sent */
114struct writequeue_entry {
115 struct list_head list;
116 struct page *page;
117 int offset;
118 int len;
119 int end;
120 int users;
121 struct connection *con;
8f2dc78d
AA
122 struct list_head msgs;
123 struct kref ref;
124};
125
126struct dlm_msg {
127 struct writequeue_entry *entry;
2874d1a6
AA
128 struct dlm_msg *orig_msg;
129 bool retransmit;
8f2dc78d
AA
130 void *ppc;
131 int len;
132 int idx; /* new()/commit() idx exchange */
133
134 struct list_head list;
135 struct kref ref;
fdda387f
PC
136};
137
36b71a8b
DT
138struct dlm_node_addr {
139 struct list_head list;
140 int nodeid;
e125fbeb 141 int mark;
36b71a8b 142 int addr_count;
98e1b60e 143 int curr_addr_index;
36b71a8b
DT
144 struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
145};
146
cc661fc9
BP
147static struct listen_sock_callbacks {
148 void (*sk_error_report)(struct sock *);
149 void (*sk_data_ready)(struct sock *);
150 void (*sk_state_change)(struct sock *);
151 void (*sk_write_space)(struct sock *);
152} listen_sock;
153
36b71a8b
DT
154static LIST_HEAD(dlm_node_addrs);
155static DEFINE_SPINLOCK(dlm_node_addrs_spin);
156
d11ccd45 157static struct listen_connection listen_con;
6ed7257b
PC
158static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
159static int dlm_local_count;
51746163 160int dlm_allow_conn;
fdda387f 161
1d6e8131
PC
162/* Work queues */
163static struct workqueue_struct *recv_workqueue;
164static struct workqueue_struct *send_workqueue;
fdda387f 165
5e9ccc37 166static struct hlist_head connection_hash[CONN_HASH_SIZE];
a47666eb
AA
167static DEFINE_SPINLOCK(connections_lock);
168DEFINE_STATIC_SRCU(connections_srcu);
fdda387f 169
1d6e8131
PC
170static void process_recv_sockets(struct work_struct *work);
171static void process_send_sockets(struct work_struct *work);
fdda387f 172
0672c3c2
AA
173static void sctp_connect_to_sock(struct connection *con);
174static void tcp_connect_to_sock(struct connection *con);
42873c90 175static void dlm_tcp_shutdown(struct connection *con);
5e9ccc37 176
b38bc9c2 177static struct connection *__find_con(int nodeid, int r)
5e9ccc37 178{
5e9ccc37
CC
179 struct connection *con;
180
a47666eb 181 hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
b38bc9c2 182 if (con->nodeid == nodeid)
5e9ccc37
CC
183 return con;
184 }
a47666eb 185
5e9ccc37
CC
186 return NULL;
187}
188
8aa31cbf
AA
189static bool tcp_eof_condition(struct connection *con)
190{
191 return atomic_read(&con->writequeue_cnt);
192}
193
6cde210a 194static int dlm_con_init(struct connection *con, int nodeid)
fdda387f 195{
4798cbbf
AA
196 con->rx_buflen = dlm_config.ci_buffer_size;
197 con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
6cde210a
AA
198 if (!con->rx_buf)
199 return -ENOMEM;
4798cbbf 200
6ed7257b
PC
201 con->nodeid = nodeid;
202 mutex_init(&con->sock_mutex);
203 INIT_LIST_HEAD(&con->writequeue);
204 spin_lock_init(&con->writequeue_lock);
8aa31cbf 205 atomic_set(&con->writequeue_cnt, 0);
6ed7257b
PC
206 INIT_WORK(&con->swork, process_send_sockets);
207 INIT_WORK(&con->rwork, process_recv_sockets);
055923bf 208 init_waitqueue_head(&con->shutdown_wait);
fdda387f 209
42873c90 210 if (dlm_config.ci_protocol == 0) {
0672c3c2 211 con->connect_action = tcp_connect_to_sock;
42873c90 212 con->shutdown_action = dlm_tcp_shutdown;
8aa31cbf 213 con->eof_condition = tcp_eof_condition;
42873c90 214 } else {
0672c3c2 215 con->connect_action = sctp_connect_to_sock;
42873c90 216 }
fdda387f 217
6cde210a
AA
218 return 0;
219}
220
221/*
222 * If 'allocation' is zero then we don't attempt to create a new
223 * connection structure for this node.
224 */
225static struct connection *nodeid2con(int nodeid, gfp_t alloc)
226{
227 struct connection *con, *tmp;
228 int r, ret;
229
b38bc9c2
AA
230 r = nodeid_hash(nodeid);
231 con = __find_con(nodeid, r);
6cde210a
AA
232 if (con || !alloc)
233 return con;
234
235 con = kzalloc(sizeof(*con), alloc);
236 if (!con)
237 return NULL;
238
239 ret = dlm_con_init(con, nodeid);
240 if (ret) {
241 kfree(con);
242 return NULL;
243 }
244
a47666eb 245 spin_lock(&connections_lock);
4f2b30fd
AA
246 /* Because multiple workqueues/threads calls this function it can
247 * race on multiple cpu's. Instead of locking hot path __find_con()
248 * we just check in rare cases of recently added nodes again
249 * under protection of connections_lock. If this is the case we
250 * abort our connection creation and return the existing connection.
251 */
b38bc9c2 252 tmp = __find_con(nodeid, r);
4f2b30fd
AA
253 if (tmp) {
254 spin_unlock(&connections_lock);
255 kfree(con->rx_buf);
256 kfree(con);
257 return tmp;
258 }
259
a47666eb
AA
260 hlist_add_head_rcu(&con->list, &connection_hash[r]);
261 spin_unlock(&connections_lock);
262
6ed7257b
PC
263 return con;
264}
265
5e9ccc37
CC
266/* Loop round all connections */
267static void foreach_conn(void (*conn_func)(struct connection *c))
268{
b38bc9c2 269 int i;
5e9ccc37
CC
270 struct connection *con;
271
272 for (i = 0; i < CONN_HASH_SIZE; i++) {
a47666eb 273 hlist_for_each_entry_rcu(con, &connection_hash[i], list)
5e9ccc37 274 conn_func(con);
5e9ccc37 275 }
fdda387f
PC
276}
277
36b71a8b
DT
278static struct dlm_node_addr *find_node_addr(int nodeid)
279{
280 struct dlm_node_addr *na;
281
282 list_for_each_entry(na, &dlm_node_addrs, list) {
283 if (na->nodeid == nodeid)
284 return na;
285 }
286 return NULL;
287}
288
40c6b83e
AA
289static int addr_compare(const struct sockaddr_storage *x,
290 const struct sockaddr_storage *y)
6ed7257b 291{
36b71a8b
DT
292 switch (x->ss_family) {
293 case AF_INET: {
294 struct sockaddr_in *sinx = (struct sockaddr_in *)x;
295 struct sockaddr_in *siny = (struct sockaddr_in *)y;
296 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
297 return 0;
298 if (sinx->sin_port != siny->sin_port)
299 return 0;
300 break;
301 }
302 case AF_INET6: {
303 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
304 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
305 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
306 return 0;
307 if (sinx->sin6_port != siny->sin6_port)
308 return 0;
309 break;
310 }
311 default:
312 return 0;
313 }
314 return 1;
315}
316
317static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
e125fbeb
AA
318 struct sockaddr *sa_out, bool try_new_addr,
319 unsigned int *mark)
36b71a8b
DT
320{
321 struct sockaddr_storage sas;
322 struct dlm_node_addr *na;
6ed7257b
PC
323
324 if (!dlm_local_count)
325 return -1;
326
36b71a8b
DT
327 spin_lock(&dlm_node_addrs_spin);
328 na = find_node_addr(nodeid);
98e1b60e 329 if (na && na->addr_count) {
ee44b4bc
MRL
330 memcpy(&sas, na->addr[na->curr_addr_index],
331 sizeof(struct sockaddr_storage));
332
98e1b60e
MC
333 if (try_new_addr) {
334 na->curr_addr_index++;
335 if (na->curr_addr_index == na->addr_count)
336 na->curr_addr_index = 0;
337 }
98e1b60e 338 }
36b71a8b
DT
339 spin_unlock(&dlm_node_addrs_spin);
340
341 if (!na)
342 return -EEXIST;
343
344 if (!na->addr_count)
345 return -ENOENT;
346
e125fbeb
AA
347 *mark = na->mark;
348
36b71a8b
DT
349 if (sas_out)
350 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
351
352 if (!sa_out)
353 return 0;
6ed7257b
PC
354
355 if (dlm_local_addr[0]->ss_family == AF_INET) {
36b71a8b
DT
356 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas;
357 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
6ed7257b
PC
358 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
359 } else {
36b71a8b
DT
360 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas;
361 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
4e3fd7a0 362 ret6->sin6_addr = in6->sin6_addr;
6ed7257b
PC
363 }
364
365 return 0;
366}
367
e125fbeb
AA
368static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
369 unsigned int *mark)
36b71a8b
DT
370{
371 struct dlm_node_addr *na;
372 int rv = -EEXIST;
98e1b60e 373 int addr_i;
36b71a8b
DT
374
375 spin_lock(&dlm_node_addrs_spin);
376 list_for_each_entry(na, &dlm_node_addrs, list) {
377 if (!na->addr_count)
378 continue;
379
98e1b60e
MC
380 for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
381 if (addr_compare(na->addr[addr_i], addr)) {
382 *nodeid = na->nodeid;
e125fbeb 383 *mark = na->mark;
98e1b60e
MC
384 rv = 0;
385 goto unlock;
386 }
387 }
36b71a8b 388 }
98e1b60e 389unlock:
36b71a8b
DT
390 spin_unlock(&dlm_node_addrs_spin);
391 return rv;
392}
393
4f19d071
AA
394/* caller need to held dlm_node_addrs_spin lock */
395static bool dlm_lowcomms_na_has_addr(const struct dlm_node_addr *na,
396 const struct sockaddr_storage *addr)
397{
398 int i;
399
400 for (i = 0; i < na->addr_count; i++) {
401 if (addr_compare(na->addr[i], addr))
402 return true;
403 }
404
405 return false;
406}
407
36b71a8b
DT
408int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
409{
410 struct sockaddr_storage *new_addr;
411 struct dlm_node_addr *new_node, *na;
4f19d071 412 bool ret;
36b71a8b
DT
413
414 new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
415 if (!new_node)
416 return -ENOMEM;
417
418 new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
419 if (!new_addr) {
420 kfree(new_node);
421 return -ENOMEM;
422 }
423
424 memcpy(new_addr, addr, len);
425
426 spin_lock(&dlm_node_addrs_spin);
427 na = find_node_addr(nodeid);
428 if (!na) {
429 new_node->nodeid = nodeid;
430 new_node->addr[0] = new_addr;
431 new_node->addr_count = 1;
e125fbeb 432 new_node->mark = dlm_config.ci_mark;
36b71a8b
DT
433 list_add(&new_node->list, &dlm_node_addrs);
434 spin_unlock(&dlm_node_addrs_spin);
435 return 0;
436 }
437
4f19d071
AA
438 ret = dlm_lowcomms_na_has_addr(na, addr);
439 if (ret) {
440 spin_unlock(&dlm_node_addrs_spin);
441 kfree(new_addr);
442 kfree(new_node);
443 return -EEXIST;
444 }
445
36b71a8b
DT
446 if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
447 spin_unlock(&dlm_node_addrs_spin);
448 kfree(new_addr);
449 kfree(new_node);
450 return -ENOSPC;
451 }
452
453 na->addr[na->addr_count++] = new_addr;
454 spin_unlock(&dlm_node_addrs_spin);
455 kfree(new_node);
456 return 0;
457}
458
fdda387f 459/* Data available on socket or listen socket received a connect */
676d2369 460static void lowcomms_data_ready(struct sock *sk)
fdda387f 461{
93eaadeb 462 struct connection *con;
463
464 read_lock_bh(&sk->sk_callback_lock);
465 con = sock2con(sk);
afb853fb 466 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
1d6e8131 467 queue_work(recv_workqueue, &con->rwork);
93eaadeb 468 read_unlock_bh(&sk->sk_callback_lock);
fdda387f
PC
469}
470
d11ccd45
AA
471static void lowcomms_listen_data_ready(struct sock *sk)
472{
473 queue_work(recv_workqueue, &listen_con.rwork);
474}
475
fdda387f
PC
476static void lowcomms_write_space(struct sock *sk)
477{
93eaadeb 478 struct connection *con;
fdda387f 479
93eaadeb 480 read_lock_bh(&sk->sk_callback_lock);
481 con = sock2con(sk);
b36930dd 482 if (!con)
93eaadeb 483 goto out;
b36930dd 484
19633c7e
AA
485 if (!test_and_set_bit(CF_CONNECTED, &con->flags)) {
486 log_print("successful connected to node %d", con->nodeid);
487 queue_work(send_workqueue, &con->swork);
488 goto out;
489 }
490
b36930dd
DM
491 clear_bit(SOCK_NOSPACE, &con->sock->flags);
492
493 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
494 con->sock->sk->sk_write_pending--;
9cd3e072 495 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
b36930dd
DM
496 }
497
01da24d3 498 queue_work(send_workqueue, &con->swork);
93eaadeb 499out:
500 read_unlock_bh(&sk->sk_callback_lock);
fdda387f
PC
501}
502
503static inline void lowcomms_connect_sock(struct connection *con)
504{
063c4c99
LMB
505 if (test_bit(CF_CLOSE, &con->flags))
506 return;
61d9102b
BP
507 queue_work(send_workqueue, &con->swork);
508 cond_resched();
fdda387f
PC
509}
510
511static void lowcomms_state_change(struct sock *sk)
512{
ee44b4bc
MRL
513 /* SCTP layer is not calling sk_data_ready when the connection
514 * is done, so we catch the signal through here. Also, it
515 * doesn't switch socket state when entering shutdown, so we
516 * skip the write in that case.
517 */
518 if (sk->sk_shutdown) {
519 if (sk->sk_shutdown == RCV_SHUTDOWN)
520 lowcomms_data_ready(sk);
521 } else if (sk->sk_state == TCP_ESTABLISHED) {
fdda387f 522 lowcomms_write_space(sk);
ee44b4bc 523 }
fdda387f
PC
524}
525
391fbdc5
CC
526int dlm_lowcomms_connect_node(int nodeid)
527{
528 struct connection *con;
b38bc9c2 529 int idx;
391fbdc5
CC
530
531 if (nodeid == dlm_our_nodeid())
532 return 0;
533
b38bc9c2 534 idx = srcu_read_lock(&connections_srcu);
391fbdc5 535 con = nodeid2con(nodeid, GFP_NOFS);
b38bc9c2
AA
536 if (!con) {
537 srcu_read_unlock(&connections_srcu, idx);
391fbdc5 538 return -ENOMEM;
b38bc9c2
AA
539 }
540
391fbdc5 541 lowcomms_connect_sock(con);
b38bc9c2
AA
542 srcu_read_unlock(&connections_srcu, idx);
543
391fbdc5
CC
544 return 0;
545}
546
e125fbeb
AA
547int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
548{
549 struct dlm_node_addr *na;
550
551 spin_lock(&dlm_node_addrs_spin);
552 na = find_node_addr(nodeid);
553 if (!na) {
554 spin_unlock(&dlm_node_addrs_spin);
555 return -ENOENT;
556 }
557
558 na->mark = mark;
559 spin_unlock(&dlm_node_addrs_spin);
560
561 return 0;
562}
563
b3a5bbfd
BP
564static void lowcomms_error_report(struct sock *sk)
565{
b81171cb 566 struct connection *con;
b3a5bbfd 567 struct sockaddr_storage saddr;
b81171cb 568 void (*orig_report)(struct sock *) = NULL;
b3a5bbfd 569
b81171cb
BP
570 read_lock_bh(&sk->sk_callback_lock);
571 con = sock2con(sk);
572 if (con == NULL)
573 goto out;
574
cc661fc9 575 orig_report = listen_sock.sk_error_report;
1a31833d 576 if (con->sock == NULL ||
9b2c45d4 577 kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) {
b3a5bbfd
BP
578 printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
579 "sending to node %d, port %d, "
580 "sk_err=%d/%d\n", dlm_our_nodeid(),
581 con->nodeid, dlm_config.ci_tcp_port,
582 sk->sk_err, sk->sk_err_soft);
b3a5bbfd
BP
583 } else if (saddr.ss_family == AF_INET) {
584 struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr;
585
586 printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
587 "sending to node %d at %pI4, port %d, "
588 "sk_err=%d/%d\n", dlm_our_nodeid(),
589 con->nodeid, &sin4->sin_addr.s_addr,
590 dlm_config.ci_tcp_port, sk->sk_err,
591 sk->sk_err_soft);
592 } else {
593 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr;
594
595 printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
596 "sending to node %d at %u.%u.%u.%u, "
597 "port %d, sk_err=%d/%d\n", dlm_our_nodeid(),
598 con->nodeid, sin6->sin6_addr.s6_addr32[0],
599 sin6->sin6_addr.s6_addr32[1],
600 sin6->sin6_addr.s6_addr32[2],
601 sin6->sin6_addr.s6_addr32[3],
602 dlm_config.ci_tcp_port, sk->sk_err,
603 sk->sk_err_soft);
604 }
ba868d9d
AA
605
606 /* below sendcon only handling */
607 if (test_bit(CF_IS_OTHERCON, &con->flags))
608 con = con->sendcon;
609
610 switch (sk->sk_err) {
611 case ECONNREFUSED:
612 set_bit(CF_DELAY_CONNECT, &con->flags);
613 break;
614 default:
615 break;
616 }
617
618 if (!test_and_set_bit(CF_RECONNECT, &con->flags))
619 queue_work(send_workqueue, &con->swork);
620
b81171cb
BP
621out:
622 read_unlock_bh(&sk->sk_callback_lock);
623 if (orig_report)
624 orig_report(sk);
625}
626
627/* Note: sk_callback_lock must be locked before calling this function. */
cc661fc9 628static void save_listen_callbacks(struct socket *sock)
b81171cb 629{
cc661fc9
BP
630 struct sock *sk = sock->sk;
631
632 listen_sock.sk_data_ready = sk->sk_data_ready;
633 listen_sock.sk_state_change = sk->sk_state_change;
634 listen_sock.sk_write_space = sk->sk_write_space;
635 listen_sock.sk_error_report = sk->sk_error_report;
b81171cb
BP
636}
637
cc661fc9 638static void restore_callbacks(struct socket *sock)
b81171cb 639{
cc661fc9
BP
640 struct sock *sk = sock->sk;
641
b81171cb 642 write_lock_bh(&sk->sk_callback_lock);
b81171cb 643 sk->sk_user_data = NULL;
cc661fc9
BP
644 sk->sk_data_ready = listen_sock.sk_data_ready;
645 sk->sk_state_change = listen_sock.sk_state_change;
646 sk->sk_write_space = listen_sock.sk_write_space;
647 sk->sk_error_report = listen_sock.sk_error_report;
b81171cb 648 write_unlock_bh(&sk->sk_callback_lock);
b3a5bbfd
BP
649}
650
d11ccd45
AA
651static void add_listen_sock(struct socket *sock, struct listen_connection *con)
652{
653 struct sock *sk = sock->sk;
654
655 write_lock_bh(&sk->sk_callback_lock);
656 save_listen_callbacks(sock);
657 con->sock = sock;
658
659 sk->sk_user_data = con;
660 sk->sk_allocation = GFP_NOFS;
661 /* Install a data_ready callback */
662 sk->sk_data_ready = lowcomms_listen_data_ready;
663 write_unlock_bh(&sk->sk_callback_lock);
664}
665
fdda387f 666/* Make a socket active */
988419a9 667static void add_sock(struct socket *sock, struct connection *con)
fdda387f 668{
b81171cb
BP
669 struct sock *sk = sock->sk;
670
671 write_lock_bh(&sk->sk_callback_lock);
fdda387f
PC
672 con->sock = sock;
673
b81171cb 674 sk->sk_user_data = con;
fdda387f 675 /* Install a data_ready callback */
b81171cb
BP
676 sk->sk_data_ready = lowcomms_data_ready;
677 sk->sk_write_space = lowcomms_write_space;
678 sk->sk_state_change = lowcomms_state_change;
679 sk->sk_allocation = GFP_NOFS;
680 sk->sk_error_report = lowcomms_error_report;
681 write_unlock_bh(&sk->sk_callback_lock);
fdda387f
PC
682}
683
6ed7257b 684/* Add the port number to an IPv6 or 4 sockaddr and return the address
fdda387f
PC
685 length */
686static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
687 int *addr_len)
688{
6ed7257b 689 saddr->ss_family = dlm_local_addr[0]->ss_family;
ac33d071 690 if (saddr->ss_family == AF_INET) {
fdda387f
PC
691 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
692 in4_addr->sin_port = cpu_to_be16(port);
693 *addr_len = sizeof(struct sockaddr_in);
6ed7257b 694 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
ac33d071 695 } else {
fdda387f
PC
696 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
697 in6_addr->sin6_port = cpu_to_be16(port);
698 *addr_len = sizeof(struct sockaddr_in6);
699 }
01c8cab2 700 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
fdda387f
PC
701}
702
d11ccd45
AA
703static void dlm_close_sock(struct socket **sock)
704{
705 if (*sock) {
706 restore_callbacks(*sock);
707 sock_release(*sock);
708 *sock = NULL;
709 }
710}
711
fdda387f 712/* Close a remote connection and tidy up */
0d737a8c
MRL
713static void close_connection(struct connection *con, bool and_other,
714 bool tx, bool rx)
fdda387f 715{
b2a66629 716 bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
717
0aa18464 718 if (tx && !closing && cancel_work_sync(&con->swork)) {
0d737a8c 719 log_print("canceled swork for node %d", con->nodeid);
0aa18464 720 clear_bit(CF_WRITE_PENDING, &con->flags);
721 }
722 if (rx && !closing && cancel_work_sync(&con->rwork)) {
0d737a8c 723 log_print("canceled rwork for node %d", con->nodeid);
0aa18464 724 clear_bit(CF_READ_PENDING, &con->flags);
725 }
fdda387f 726
0d737a8c 727 mutex_lock(&con->sock_mutex);
d11ccd45
AA
728 dlm_close_sock(&con->sock);
729
fdda387f 730 if (con->othercon && and_other) {
ac33d071 731 /* Will only re-enter once. */
c6aa00e3 732 close_connection(con->othercon, false, tx, rx);
fdda387f 733 }
9e5f2825 734
4798cbbf 735 con->rx_leftover = 0;
61d96be0 736 con->retries = 0;
19633c7e 737 clear_bit(CF_CONNECTED, &con->flags);
ba868d9d
AA
738 clear_bit(CF_DELAY_CONNECT, &con->flags);
739 clear_bit(CF_RECONNECT, &con->flags);
8aa31cbf 740 clear_bit(CF_EOF, &con->flags);
61d96be0 741 mutex_unlock(&con->sock_mutex);
b2a66629 742 clear_bit(CF_CLOSING, &con->flags);
fdda387f
PC
743}
744
055923bf
AA
745static void shutdown_connection(struct connection *con)
746{
747 int ret;
748
eec054b5 749 flush_work(&con->swork);
055923bf
AA
750
751 mutex_lock(&con->sock_mutex);
752 /* nothing to shutdown */
753 if (!con->sock) {
754 mutex_unlock(&con->sock_mutex);
755 return;
756 }
757
758 set_bit(CF_SHUTDOWN, &con->flags);
759 ret = kernel_sock_shutdown(con->sock, SHUT_WR);
760 mutex_unlock(&con->sock_mutex);
761 if (ret) {
762 log_print("Connection %p failed to shutdown: %d will force close",
763 con, ret);
764 goto force_close;
765 } else {
766 ret = wait_event_timeout(con->shutdown_wait,
767 !test_bit(CF_SHUTDOWN, &con->flags),
768 DLM_SHUTDOWN_WAIT_TIMEOUT);
769 if (ret == 0) {
770 log_print("Connection %p shutdown timed out, will force close",
771 con);
772 goto force_close;
773 }
774 }
775
776 return;
777
778force_close:
779 clear_bit(CF_SHUTDOWN, &con->flags);
780 close_connection(con, false, true, true);
781}
782
783static void dlm_tcp_shutdown(struct connection *con)
784{
785 if (con->othercon)
786 shutdown_connection(con->othercon);
787 shutdown_connection(con);
788}
789
4798cbbf
AA
790static int con_realloc_receive_buf(struct connection *con, int newlen)
791{
792 unsigned char *newbuf;
793
794 newbuf = kmalloc(newlen, GFP_NOFS);
795 if (!newbuf)
796 return -ENOMEM;
797
798 /* copy any leftover from last receive */
799 if (con->rx_leftover)
800 memmove(newbuf, con->rx_buf, con->rx_leftover);
801
802 /* swap to new buffer space */
803 kfree(con->rx_buf);
804 con->rx_buflen = newlen;
805 con->rx_buf = newbuf;
806
807 return 0;
808}
809
fdda387f
PC
810/* Data received from remote end */
811static int receive_from_sock(struct connection *con)
812{
fdda387f 813 int call_again_soon = 0;
4798cbbf
AA
814 struct msghdr msg;
815 struct kvec iov;
816 int ret, buflen;
fdda387f 817
f1f1c1cc 818 mutex_lock(&con->sock_mutex);
fdda387f 819
a34fbc63
PC
820 if (con->sock == NULL) {
821 ret = -EAGAIN;
822 goto out_close;
823 }
4798cbbf 824
4798cbbf
AA
825 /* realloc if we get new buffer size to read out */
826 buflen = dlm_config.ci_buffer_size;
827 if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
828 ret = con_realloc_receive_buf(con, buflen);
829 if (ret < 0)
fdda387f 830 goto out_resched;
fdda387f
PC
831 }
832
4798cbbf
AA
833 /* calculate new buffer parameter regarding last receive and
834 * possible leftover bytes
fdda387f 835 */
4798cbbf
AA
836 iov.iov_base = con->rx_buf + con->rx_leftover;
837 iov.iov_len = con->rx_buflen - con->rx_leftover;
fdda387f 838
4798cbbf
AA
839 memset(&msg, 0, sizeof(msg));
840 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
841 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
842 msg.msg_flags);
fdda387f
PC
843 if (ret <= 0)
844 goto out_close;
4798cbbf 845 else if (ret == iov.iov_len)
ee44b4bc 846 call_again_soon = 1;
bd44e2b0 847
4798cbbf
AA
848 /* new buflen according readed bytes and leftover from last receive */
849 buflen = ret + con->rx_leftover;
850 ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
851 if (ret < 0)
852 goto out_close;
fdda387f 853
4798cbbf
AA
854 /* calculate leftover bytes from process and put it into begin of
855 * the receive buffer, so next receive we have the full message
856 * at the start address of the receive buffer.
857 */
858 con->rx_leftover = buflen - ret;
859 if (con->rx_leftover) {
860 memmove(con->rx_buf, con->rx_buf + ret,
861 con->rx_leftover);
862 call_again_soon = true;
fdda387f
PC
863 }
864
fdda387f
PC
865 if (call_again_soon)
866 goto out_resched;
4798cbbf 867
f1f1c1cc 868 mutex_unlock(&con->sock_mutex);
ac33d071 869 return 0;
fdda387f 870
ac33d071 871out_resched:
1d6e8131
PC
872 if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
873 queue_work(recv_workqueue, &con->rwork);
f1f1c1cc 874 mutex_unlock(&con->sock_mutex);
bd44e2b0 875 return -EAGAIN;
fdda387f 876
ac33d071 877out_close:
ba868d9d 878 if (ret == 0) {
ba868d9d
AA
879 log_print("connection %p got EOF from %d",
880 con, con->nodeid);
8aa31cbf
AA
881
882 if (con->eof_condition && con->eof_condition(con)) {
883 set_bit(CF_EOF, &con->flags);
884 mutex_unlock(&con->sock_mutex);
885 } else {
886 mutex_unlock(&con->sock_mutex);
887 close_connection(con, false, true, false);
888
889 /* handling for tcp shutdown */
890 clear_bit(CF_SHUTDOWN, &con->flags);
891 wake_up(&con->shutdown_wait);
892 }
893
ba868d9d
AA
894 /* signal to breaking receive worker */
895 ret = -1;
8aa31cbf
AA
896 } else {
897 mutex_unlock(&con->sock_mutex);
fdda387f 898 }
fdda387f
PC
899 return ret;
900}
901
902/* Listening socket is busy, accept a connection */
d11ccd45 903static int accept_from_sock(struct listen_connection *con)
fdda387f
PC
904{
905 int result;
906 struct sockaddr_storage peeraddr;
907 struct socket *newsock;
b38bc9c2 908 int len, idx;
fdda387f
PC
909 int nodeid;
910 struct connection *newcon;
bd44e2b0 911 struct connection *addcon;
3f78cd7d 912 unsigned int mark;
fdda387f 913
513ef596 914 if (!dlm_allow_conn) {
513ef596
DT
915 return -1;
916 }
513ef596 917
d11ccd45 918 if (!con->sock)
3421fb15 919 return -ENOTCONN;
fdda387f 920
3421fb15 921 result = kernel_accept(con->sock, &newsock, O_NONBLOCK);
fdda387f
PC
922 if (result < 0)
923 goto accept_err;
924
925 /* Get the connected socket's peer */
926 memset(&peeraddr, 0, sizeof(peeraddr));
9b2c45d4
DV
927 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2);
928 if (len < 0) {
fdda387f
PC
929 result = -ECONNABORTED;
930 goto accept_err;
931 }
932
933 /* Get the new node's NODEID */
934 make_sockaddr(&peeraddr, 0, &len);
e125fbeb 935 if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) {
bcaadf5c 936 unsigned char *b=(unsigned char *)&peeraddr;
617e82e1 937 log_print("connect from non cluster node");
bcaadf5c
MY
938 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
939 b, sizeof(struct sockaddr_storage));
fdda387f 940 sock_release(newsock);
fdda387f
PC
941 return -1;
942 }
943
944 log_print("got connection from %d", nodeid);
945
946 /* Check to see if we already have a connection to this node. This
947 * could happen if the two nodes initiate a connection at roughly
948 * the same time and the connections cross on the wire.
fdda387f
PC
949 * In this case we store the incoming one in "othercon"
950 */
b38bc9c2 951 idx = srcu_read_lock(&connections_srcu);
748285cc 952 newcon = nodeid2con(nodeid, GFP_NOFS);
fdda387f 953 if (!newcon) {
b38bc9c2 954 srcu_read_unlock(&connections_srcu, idx);
fdda387f
PC
955 result = -ENOMEM;
956 goto accept_err;
957 }
d11ccd45 958
e125fbeb
AA
959 sock_set_mark(newsock->sk, mark);
960
d11ccd45 961 mutex_lock(&newcon->sock_mutex);
fdda387f 962 if (newcon->sock) {
ac33d071 963 struct connection *othercon = newcon->othercon;
fdda387f
PC
964
965 if (!othercon) {
a47666eb 966 othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
fdda387f 967 if (!othercon) {
617e82e1 968 log_print("failed to allocate incoming socket");
f1f1c1cc 969 mutex_unlock(&newcon->sock_mutex);
b38bc9c2 970 srcu_read_unlock(&connections_srcu, idx);
fdda387f
PC
971 result = -ENOMEM;
972 goto accept_err;
973 }
4798cbbf 974
6cde210a
AA
975 result = dlm_con_init(othercon, nodeid);
976 if (result < 0) {
4798cbbf 977 kfree(othercon);
2fd8db2d 978 mutex_unlock(&newcon->sock_mutex);
b38bc9c2 979 srcu_read_unlock(&connections_srcu, idx);
4798cbbf
AA
980 goto accept_err;
981 }
982
e9a470ac 983 lockdep_set_subclass(&othercon->sock_mutex, 1);
7443bc96 984 set_bit(CF_IS_OTHERCON, &othercon->flags);
6cde210a 985 newcon->othercon = othercon;
ba868d9d 986 othercon->sendcon = newcon;
ba3ab3ca
AA
987 } else {
988 /* close other sock con if we have something new */
989 close_connection(othercon, false, true, false);
61d96be0 990 }
ba3ab3ca 991
e9a470ac 992 mutex_lock(&othercon->sock_mutex);
ba3ab3ca
AA
993 add_sock(newsock, othercon);
994 addcon = othercon;
995 mutex_unlock(&othercon->sock_mutex);
fdda387f
PC
996 }
997 else {
3735b4b9
BP
998 /* accept copies the sk after we've saved the callbacks, so we
999 don't want to save them a second time or comm errors will
1000 result in calling sk_error_report recursively. */
988419a9 1001 add_sock(newsock, newcon);
bd44e2b0 1002 addcon = newcon;
fdda387f
PC
1003 }
1004
b30a624f 1005 set_bit(CF_CONNECTED, &addcon->flags);
f1f1c1cc 1006 mutex_unlock(&newcon->sock_mutex);
fdda387f
PC
1007
1008 /*
1009 * Add it to the active queue in case we got data
25985edc 1010 * between processing the accept adding the socket
fdda387f
PC
1011 * to the read_sockets list
1012 */
bd44e2b0
PC
1013 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
1014 queue_work(recv_workqueue, &addcon->rwork);
fdda387f 1015
b38bc9c2
AA
1016 srcu_read_unlock(&connections_srcu, idx);
1017
fdda387f
PC
1018 return 0;
1019
ac33d071 1020accept_err:
3421fb15 1021 if (newsock)
1022 sock_release(newsock);
fdda387f
PC
1023
1024 if (result != -EAGAIN)
617e82e1 1025 log_print("error accepting connection from node: %d", result);
fdda387f
PC
1026 return result;
1027}
1028
8f2dc78d 1029static void dlm_page_release(struct kref *kref)
6ed7257b 1030{
8f2dc78d
AA
1031 struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
1032 ref);
1033
6ed7257b
PC
1034 __free_page(e->page);
1035 kfree(e);
1036}
1037
8f2dc78d
AA
1038static void dlm_msg_release(struct kref *kref)
1039{
1040 struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
1041
1042 kref_put(&msg->entry->ref, dlm_page_release);
1043 kfree(msg);
1044}
1045
1046static void free_entry(struct writequeue_entry *e)
1047{
1048 struct dlm_msg *msg, *tmp;
1049
1050 list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
2874d1a6
AA
1051 if (msg->orig_msg) {
1052 msg->orig_msg->retransmit = false;
1053 kref_put(&msg->orig_msg->ref, dlm_msg_release);
1054 }
8f2dc78d
AA
1055 list_del(&msg->list);
1056 kref_put(&msg->ref, dlm_msg_release);
1057 }
1058
1059 list_del(&e->list);
1060 atomic_dec(&e->con->writequeue_cnt);
1061 kref_put(&e->ref, dlm_page_release);
1062}
1063
5d689871
MC
1064/*
1065 * writequeue_entry_complete - try to delete and free write queue entry
1066 * @e: write queue entry to try to delete
1067 * @completed: bytes completed
1068 *
1069 * writequeue_lock must be held.
1070 */
1071static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
1072{
1073 e->offset += completed;
1074 e->len -= completed;
1075
8f2dc78d 1076 if (e->len == 0 && e->users == 0)
5d689871 1077 free_entry(e);
5d689871
MC
1078}
1079
ee44b4bc
MRL
1080/*
1081 * sctp_bind_addrs - bind a SCTP socket to all our addresses
1082 */
13004e8a 1083static int sctp_bind_addrs(struct socket *sock, uint16_t port)
ee44b4bc
MRL
1084{
1085 struct sockaddr_storage localaddr;
c0425a42 1086 struct sockaddr *addr = (struct sockaddr *)&localaddr;
ee44b4bc
MRL
1087 int i, addr_len, result = 0;
1088
1089 for (i = 0; i < dlm_local_count; i++) {
1090 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
1091 make_sockaddr(&localaddr, port, &addr_len);
1092
1093 if (!i)
13004e8a 1094 result = kernel_bind(sock, addr, addr_len);
ee44b4bc 1095 else
13004e8a 1096 result = sock_bind_add(sock->sk, addr, addr_len);
ee44b4bc
MRL
1097
1098 if (result < 0) {
1099 log_print("Can't bind to %d addr number %d, %d.\n",
1100 port, i + 1, result);
1101 break;
1102 }
1103 }
1104 return result;
1105}
1106
6ed7257b
PC
1107/* Initiate an SCTP association.
1108 This is a special case of send_to_sock() in that we don't yet have a
1109 peeled-off socket for this association, so we use the listening socket
1110 and add the primary IP address of the remote node.
1111 */
ee44b4bc 1112static void sctp_connect_to_sock(struct connection *con)
6ed7257b 1113{
ee44b4bc 1114 struct sockaddr_storage daddr;
ee44b4bc
MRL
1115 int result;
1116 int addr_len;
1117 struct socket *sock;
9c9f168f 1118 unsigned int mark;
ee44b4bc 1119
5d689871 1120 mutex_lock(&con->sock_mutex);
6ed7257b 1121
ee44b4bc
MRL
1122 /* Some odd races can cause double-connects, ignore them */
1123 if (con->retries++ > MAX_CONNECT_RETRIES)
1124 goto out;
1125
1126 if (con->sock) {
1127 log_print("node %d already connected.", con->nodeid);
1128 goto out;
1129 }
1130
1131 memset(&daddr, 0, sizeof(daddr));
e125fbeb 1132 result = nodeid_to_addr(con->nodeid, &daddr, NULL, true, &mark);
ee44b4bc 1133 if (result < 0) {
6ed7257b 1134 log_print("no address for nodeid %d", con->nodeid);
ee44b4bc 1135 goto out;
6ed7257b 1136 }
6ed7257b 1137
ee44b4bc
MRL
1138 /* Create a socket to communicate with */
1139 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1140 SOCK_STREAM, IPPROTO_SCTP, &sock);
1141 if (result < 0)
1142 goto socket_err;
6ed7257b 1143
9c9f168f
AA
1144 sock_set_mark(sock->sk, mark);
1145
988419a9 1146 add_sock(sock, con);
6ed7257b 1147
ee44b4bc 1148 /* Bind to all addresses. */
13004e8a 1149 if (sctp_bind_addrs(con->sock, 0))
ee44b4bc 1150 goto bind_err;
6ed7257b 1151
ee44b4bc 1152 make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
6ed7257b 1153
2df6b762 1154 log_print_ratelimited("connecting to %d", con->nodeid);
6ed7257b 1155
ee44b4bc 1156 /* Turn off Nagle's algorithm */
40ef92c6 1157 sctp_sock_set_nodelay(sock->sk);
6ed7257b 1158
f706d830
GH
1159 /*
1160 * Make sock->ops->connect() function return in specified time,
1161 * since O_NONBLOCK argument in connect() function does not work here,
1162 * then, we should restore the default value of this attribute.
1163 */
76ee0785 1164 sock_set_sndtimeo(sock->sk, 5);
ee44b4bc 1165 result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
da3627c3 1166 0);
76ee0785 1167 sock_set_sndtimeo(sock->sk, 0);
f706d830 1168
ee44b4bc
MRL
1169 if (result == -EINPROGRESS)
1170 result = 0;
19633c7e
AA
1171 if (result == 0) {
1172 if (!test_and_set_bit(CF_CONNECTED, &con->flags))
1173 log_print("successful connected to node %d", con->nodeid);
ee44b4bc 1174 goto out;
19633c7e 1175 }
98e1b60e 1176
ee44b4bc
MRL
1177bind_err:
1178 con->sock = NULL;
1179 sock_release(sock);
6ed7257b 1180
ee44b4bc
MRL
1181socket_err:
1182 /*
1183 * Some errors are fatal and this list might need adjusting. For other
1184 * errors we try again until the max number of retries is reached.
1185 */
1186 if (result != -EHOSTUNREACH &&
1187 result != -ENETUNREACH &&
1188 result != -ENETDOWN &&
1189 result != -EINVAL &&
1190 result != -EPROTONOSUPPORT) {
1191 log_print("connect %d try %d error %d", con->nodeid,
1192 con->retries, result);
1193 mutex_unlock(&con->sock_mutex);
1194 msleep(1000);
ee44b4bc
MRL
1195 lowcomms_connect_sock(con);
1196 return;
6ed7257b 1197 }
5d689871 1198
ee44b4bc 1199out:
5d689871 1200 mutex_unlock(&con->sock_mutex);
6ed7257b
PC
1201}
1202
fdda387f 1203/* Connect a new socket to its peer */
6ed7257b 1204static void tcp_connect_to_sock(struct connection *con)
fdda387f 1205{
6bd8feda 1206 struct sockaddr_storage saddr, src_addr;
e125fbeb 1207 unsigned int mark;
fdda387f 1208 int addr_len;
a89d63a1 1209 struct socket *sock = NULL;
36b71a8b 1210 int result;
fdda387f 1211
f1f1c1cc 1212 mutex_lock(&con->sock_mutex);
fdda387f
PC
1213 if (con->retries++ > MAX_CONNECT_RETRIES)
1214 goto out;
1215
1216 /* Some odd races can cause double-connects, ignore them */
36b71a8b 1217 if (con->sock)
fdda387f 1218 goto out;
fdda387f
PC
1219
1220 /* Create a socket to communicate with */
eeb1bd5c
EB
1221 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1222 SOCK_STREAM, IPPROTO_TCP, &sock);
fdda387f
PC
1223 if (result < 0)
1224 goto out_err;
1225
1226 memset(&saddr, 0, sizeof(saddr));
e125fbeb 1227 result = nodeid_to_addr(con->nodeid, &saddr, NULL, false, &mark);
36b71a8b
DT
1228 if (result < 0) {
1229 log_print("no address for nodeid %d", con->nodeid);
ac33d071 1230 goto out_err;
36b71a8b 1231 }
fdda387f 1232
e125fbeb
AA
1233 sock_set_mark(sock->sk, mark);
1234
988419a9 1235 add_sock(sock, con);
fdda387f 1236
6bd8feda
LH
1237 /* Bind to our cluster-known address connecting to avoid
1238 routing problems */
1239 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1240 make_sockaddr(&src_addr, 0, &addr_len);
1241 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1242 addr_len);
1243 if (result < 0) {
1244 log_print("could not bind for connect: %d", result);
1245 /* This *may* not indicate a critical error */
1246 }
1247
68c817a1 1248 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
fdda387f 1249
2df6b762 1250 log_print_ratelimited("connecting to %d", con->nodeid);
cb2d45da
DT
1251
1252 /* Turn off Nagle's algorithm */
12abc5ee 1253 tcp_sock_set_nodelay(sock->sk);
cb2d45da 1254
36b71a8b 1255 result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
ac33d071 1256 O_NONBLOCK);
fdda387f
PC
1257 if (result == -EINPROGRESS)
1258 result = 0;
ac33d071
PC
1259 if (result == 0)
1260 goto out;
fdda387f 1261
ac33d071 1262out_err:
fdda387f
PC
1263 if (con->sock) {
1264 sock_release(con->sock);
1265 con->sock = NULL;
a89d63a1
CD
1266 } else if (sock) {
1267 sock_release(sock);
fdda387f
PC
1268 }
1269 /*
1270 * Some errors are fatal and this list might need adjusting. For other
1271 * errors we try again until the max number of retries is reached.
1272 */
36b71a8b
DT
1273 if (result != -EHOSTUNREACH &&
1274 result != -ENETUNREACH &&
1275 result != -ENETDOWN &&
1276 result != -EINVAL &&
1277 result != -EPROTONOSUPPORT) {
1278 log_print("connect %d try %d error %d", con->nodeid,
1279 con->retries, result);
1280 mutex_unlock(&con->sock_mutex);
1281 msleep(1000);
fdda387f 1282 lowcomms_connect_sock(con);
36b71a8b 1283 return;
fdda387f 1284 }
ac33d071 1285out:
f1f1c1cc 1286 mutex_unlock(&con->sock_mutex);
ac33d071 1287 return;
fdda387f
PC
1288}
1289
d11ccd45
AA
1290/* On error caller must run dlm_close_sock() for the
1291 * listen connection socket.
1292 */
1293static int tcp_create_listen_sock(struct listen_connection *con,
1294 struct sockaddr_storage *saddr)
fdda387f 1295{
ac33d071 1296 struct socket *sock = NULL;
fdda387f 1297 int result = 0;
fdda387f
PC
1298 int addr_len;
1299
6ed7257b 1300 if (dlm_local_addr[0]->ss_family == AF_INET)
fdda387f
PC
1301 addr_len = sizeof(struct sockaddr_in);
1302 else
1303 addr_len = sizeof(struct sockaddr_in6);
1304
1305 /* Create a socket to communicate with */
eeb1bd5c
EB
1306 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1307 SOCK_STREAM, IPPROTO_TCP, &sock);
fdda387f 1308 if (result < 0) {
617e82e1 1309 log_print("Can't create listening comms socket");
fdda387f
PC
1310 goto create_out;
1311 }
1312
a5b7ab63
AA
1313 sock_set_mark(sock->sk, dlm_config.ci_mark);
1314
cb2d45da 1315 /* Turn off Nagle's algorithm */
12abc5ee 1316 tcp_sock_set_nodelay(sock->sk);
cb2d45da 1317
b58f0e8f 1318 sock_set_reuseaddr(sock->sk);
6ed7257b 1319
d11ccd45 1320 add_listen_sock(sock, con);
fdda387f
PC
1321
1322 /* Bind to our port */
68c817a1 1323 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
fdda387f
PC
1324 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1325 if (result < 0) {
617e82e1 1326 log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
fdda387f
PC
1327 goto create_out;
1328 }
ce3d9544 1329 sock_set_keepalive(sock->sk);
fdda387f
PC
1330
1331 result = sock->ops->listen(sock, 5);
1332 if (result < 0) {
617e82e1 1333 log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
fdda387f
PC
1334 goto create_out;
1335 }
1336
d11ccd45
AA
1337 return 0;
1338
ac33d071 1339create_out:
d11ccd45 1340 return result;
fdda387f
PC
1341}
1342
6ed7257b
PC
1343/* Get local addresses */
1344static void init_local(void)
1345{
1346 struct sockaddr_storage sas, *addr;
1347 int i;
1348
30d3a237 1349 dlm_local_count = 0;
1b189b88 1350 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
6ed7257b
PC
1351 if (dlm_our_addr(&sas, i))
1352 break;
1353
5c93f56f 1354 addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS);
6ed7257b
PC
1355 if (!addr)
1356 break;
6ed7257b
PC
1357 dlm_local_addr[dlm_local_count++] = addr;
1358 }
1359}
1360
043697f0
AA
1361static void deinit_local(void)
1362{
1363 int i;
1364
1365 for (i = 0; i < dlm_local_count; i++)
1366 kfree(dlm_local_addr[i]);
1367}
1368
d11ccd45
AA
1369/* Initialise SCTP socket and bind to all interfaces
1370 * On error caller must run dlm_close_sock() for the
1371 * listen connection socket.
1372 */
1373static int sctp_listen_for_all(struct listen_connection *con)
6ed7257b
PC
1374{
1375 struct socket *sock = NULL;
ee44b4bc 1376 int result = -EINVAL;
6ed7257b
PC
1377
1378 log_print("Using SCTP for communications");
1379
eeb1bd5c 1380 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
ee44b4bc 1381 SOCK_STREAM, IPPROTO_SCTP, &sock);
6ed7257b
PC
1382 if (result < 0) {
1383 log_print("Can't create comms socket, check SCTP is loaded");
1384 goto out;
1385 }
1386
26cfabf9 1387 sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
a5b7ab63 1388 sock_set_mark(sock->sk, dlm_config.ci_mark);
40ef92c6 1389 sctp_sock_set_nodelay(sock->sk);
86e92ad2 1390
d11ccd45 1391 add_listen_sock(sock, con);
b81171cb 1392
ee44b4bc 1393 /* Bind to all addresses. */
d11ccd45
AA
1394 result = sctp_bind_addrs(con->sock, dlm_config.ci_tcp_port);
1395 if (result < 0)
1396 goto out;
6ed7257b
PC
1397
1398 result = sock->ops->listen(sock, 5);
1399 if (result < 0) {
1400 log_print("Can't set socket listening");
d11ccd45 1401 goto out;
6ed7257b
PC
1402 }
1403
1404 return 0;
1405
6ed7257b
PC
1406out:
1407 return result;
1408}
1409
1410static int tcp_listen_for_all(void)
fdda387f 1411{
fdda387f 1412 /* We don't support multi-homed hosts */
1a26bfaf 1413 if (dlm_local_count > 1) {
617e82e1
DT
1414 log_print("TCP protocol can't handle multi-homed hosts, "
1415 "try SCTP");
6ed7257b
PC
1416 return -EINVAL;
1417 }
1418
1419 log_print("Using TCP for communications");
1420
d11ccd45 1421 return tcp_create_listen_sock(&listen_con, dlm_local_addr[0]);
fdda387f
PC
1422}
1423
1424
1425
1426static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1427 gfp_t allocation)
1428{
1429 struct writequeue_entry *entry;
1430
f0747ebf 1431 entry = kzalloc(sizeof(*entry), allocation);
fdda387f
PC
1432 if (!entry)
1433 return NULL;
1434
e1a7cbce 1435 entry->page = alloc_page(allocation | __GFP_ZERO);
fdda387f
PC
1436 if (!entry->page) {
1437 kfree(entry);
1438 return NULL;
1439 }
1440
fdda387f 1441 entry->con = con;
f0747ebf 1442 entry->users = 1;
8f2dc78d
AA
1443 kref_init(&entry->ref);
1444 INIT_LIST_HEAD(&entry->msgs);
fdda387f
PC
1445
1446 return entry;
1447}
1448
f0747ebf 1449static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
8f2dc78d
AA
1450 gfp_t allocation, char **ppc,
1451 void (*cb)(struct dlm_mhandle *mh),
1452 struct dlm_mhandle *mh)
f0747ebf
AA
1453{
1454 struct writequeue_entry *e;
1455
1456 spin_lock(&con->writequeue_lock);
1457 if (!list_empty(&con->writequeue)) {
1458 e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
1459 if (DLM_WQ_REMAIN_BYTES(e) >= len) {
8f2dc78d
AA
1460 kref_get(&e->ref);
1461
f0747ebf 1462 *ppc = page_address(e->page) + e->end;
8f2dc78d
AA
1463 if (cb)
1464 cb(mh);
1465
f0747ebf
AA
1466 e->end += len;
1467 e->users++;
1468 spin_unlock(&con->writequeue_lock);
1469
1470 return e;
1471 }
1472 }
1473 spin_unlock(&con->writequeue_lock);
1474
1475 e = new_writequeue_entry(con, allocation);
1476 if (!e)
1477 return NULL;
1478
8f2dc78d 1479 kref_get(&e->ref);
f0747ebf
AA
1480 *ppc = page_address(e->page);
1481 e->end += len;
8aa31cbf 1482 atomic_inc(&con->writequeue_cnt);
f0747ebf
AA
1483
1484 spin_lock(&con->writequeue_lock);
8f2dc78d
AA
1485 if (cb)
1486 cb(mh);
1487
f0747ebf
AA
1488 list_add_tail(&e->list, &con->writequeue);
1489 spin_unlock(&con->writequeue_lock);
1490
1491 return e;
1492};
1493
2874d1a6
AA
1494static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
1495 gfp_t allocation, char **ppc,
1496 void (*cb)(struct dlm_mhandle *mh),
1497 struct dlm_mhandle *mh)
1498{
1499 struct writequeue_entry *e;
1500 struct dlm_msg *msg;
1501
1502 msg = kzalloc(sizeof(*msg), allocation);
1503 if (!msg)
1504 return NULL;
1505
1506 kref_init(&msg->ref);
1507
1508 e = new_wq_entry(con, len, allocation, ppc, cb, mh);
1509 if (!e) {
1510 kfree(msg);
1511 return NULL;
1512 }
1513
1514 msg->ppc = *ppc;
1515 msg->len = len;
1516 msg->entry = e;
1517
1518 return msg;
1519}
1520
8f2dc78d
AA
1521struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
1522 char **ppc, void (*cb)(struct dlm_mhandle *mh),
1523 struct dlm_mhandle *mh)
fdda387f
PC
1524{
1525 struct connection *con;
8f2dc78d 1526 struct dlm_msg *msg;
b38bc9c2 1527 int idx;
fdda387f 1528
c45674fb
AA
1529 if (len > DEFAULT_BUFFER_SIZE ||
1530 len < sizeof(struct dlm_header)) {
1531 BUILD_BUG_ON(PAGE_SIZE < DEFAULT_BUFFER_SIZE);
692f51c8 1532 log_print("failed to allocate a buffer of size %d", len);
c45674fb 1533 WARN_ON(1);
692f51c8
AA
1534 return NULL;
1535 }
1536
b38bc9c2 1537 idx = srcu_read_lock(&connections_srcu);
fdda387f 1538 con = nodeid2con(nodeid, allocation);
b38bc9c2
AA
1539 if (!con) {
1540 srcu_read_unlock(&connections_srcu, idx);
fdda387f 1541 return NULL;
b38bc9c2
AA
1542 }
1543
2874d1a6 1544 msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, mh);
8f2dc78d
AA
1545 if (!msg) {
1546 srcu_read_unlock(&connections_srcu, idx);
1547 return NULL;
1548 }
1549
b38bc9c2 1550 /* we assume if successful commit must called */
8f2dc78d 1551 msg->idx = idx;
8f2dc78d 1552 return msg;
fdda387f
PC
1553}
1554
2874d1a6 1555static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
fdda387f 1556{
8f2dc78d 1557 struct writequeue_entry *e = msg->entry;
fdda387f
PC
1558 struct connection *con = e->con;
1559 int users;
1560
4edde74e 1561 spin_lock(&con->writequeue_lock);
8f2dc78d
AA
1562 kref_get(&msg->ref);
1563 list_add(&msg->list, &e->msgs);
1564
fdda387f
PC
1565 users = --e->users;
1566 if (users)
1567 goto out;
f0747ebf
AA
1568
1569 e->len = DLM_WQ_LENGTH_BYTES(e);
fdda387f
PC
1570 spin_unlock(&con->writequeue_lock);
1571
01da24d3 1572 queue_work(send_workqueue, &con->swork);
fdda387f
PC
1573 return;
1574
ac33d071 1575out:
fdda387f
PC
1576 spin_unlock(&con->writequeue_lock);
1577 return;
1578}
1579
2874d1a6
AA
1580void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
1581{
1582 _dlm_lowcomms_commit_msg(msg);
1583 srcu_read_unlock(&connections_srcu, msg->idx);
1584}
1585
8f2dc78d
AA
1586void dlm_lowcomms_put_msg(struct dlm_msg *msg)
1587{
1588 kref_put(&msg->ref, dlm_msg_release);
1589}
1590
2874d1a6
AA
1591/* does not held connections_srcu, usage workqueue only */
1592int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
1593{
1594 struct dlm_msg *msg_resend;
1595 char *ppc;
1596
1597 if (msg->retransmit)
1598 return 1;
1599
1600 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
1601 GFP_ATOMIC, &ppc, NULL, NULL);
1602 if (!msg_resend)
1603 return -ENOMEM;
1604
1605 msg->retransmit = true;
1606 kref_get(&msg->ref);
1607 msg_resend->orig_msg = msg;
1608
1609 memcpy(ppc, msg->ppc, msg->len);
1610 _dlm_lowcomms_commit_msg(msg_resend);
1611 dlm_lowcomms_put_msg(msg_resend);
1612
1613 return 0;
1614}
1615
fdda387f 1616/* Send a message */
ac33d071 1617static void send_to_sock(struct connection *con)
fdda387f
PC
1618{
1619 int ret = 0;
fdda387f
PC
1620 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1621 struct writequeue_entry *e;
1622 int len, offset;
f92c8dd7 1623 int count = 0;
fdda387f 1624
f1f1c1cc 1625 mutex_lock(&con->sock_mutex);
fdda387f
PC
1626 if (con->sock == NULL)
1627 goto out_connect;
1628
fdda387f
PC
1629 spin_lock(&con->writequeue_lock);
1630 for (;;) {
f0747ebf 1631 if (list_empty(&con->writequeue))
fdda387f
PC
1632 break;
1633
f0747ebf 1634 e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
fdda387f
PC
1635 len = e->len;
1636 offset = e->offset;
1637 BUG_ON(len == 0 && e->users == 0);
1638 spin_unlock(&con->writequeue_lock);
1639
1640 ret = 0;
1641 if (len) {
1329e3f2
PB
1642 ret = kernel_sendpage(con->sock, e->page, offset, len,
1643 msg_flags);
d66f8277 1644 if (ret == -EAGAIN || ret == 0) {
b36930dd 1645 if (ret == -EAGAIN &&
9cd3e072 1646 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
b36930dd
DM
1647 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1648 /* Notify TCP that we're limited by the
1649 * application window size.
1650 */
1651 set_bit(SOCK_NOSPACE, &con->sock->flags);
1652 con->sock->sk->sk_write_pending++;
1653 }
d66f8277 1654 cond_resched();
fdda387f 1655 goto out;
9c5bef58 1656 } else if (ret < 0)
ba868d9d 1657 goto out;
d66f8277 1658 }
f92c8dd7
BP
1659
1660 /* Don't starve people filling buffers */
1661 if (++count >= MAX_SEND_MSG_COUNT) {
ac33d071 1662 cond_resched();
f92c8dd7
BP
1663 count = 0;
1664 }
fdda387f
PC
1665
1666 spin_lock(&con->writequeue_lock);
5d689871 1667 writequeue_entry_complete(e, ret);
fdda387f
PC
1668 }
1669 spin_unlock(&con->writequeue_lock);
8aa31cbf
AA
1670
1671 /* close if we got EOF */
1672 if (test_and_clear_bit(CF_EOF, &con->flags)) {
1673 mutex_unlock(&con->sock_mutex);
1674 close_connection(con, false, false, true);
1675
1676 /* handling for tcp shutdown */
1677 clear_bit(CF_SHUTDOWN, &con->flags);
1678 wake_up(&con->shutdown_wait);
1679 } else {
1680 mutex_unlock(&con->sock_mutex);
1681 }
1682
1683 return;
1684
ac33d071 1685out:
f1f1c1cc 1686 mutex_unlock(&con->sock_mutex);
ac33d071 1687 return;
fdda387f 1688
ac33d071 1689out_connect:
f1f1c1cc 1690 mutex_unlock(&con->sock_mutex);
01da24d3
BP
1691 queue_work(send_workqueue, &con->swork);
1692 cond_resched();
fdda387f
PC
1693}
1694
1695static void clean_one_writequeue(struct connection *con)
1696{
5e9ccc37 1697 struct writequeue_entry *e, *safe;
fdda387f
PC
1698
1699 spin_lock(&con->writequeue_lock);
5e9ccc37 1700 list_for_each_entry_safe(e, safe, &con->writequeue, list) {
fdda387f
PC
1701 free_entry(e);
1702 }
1703 spin_unlock(&con->writequeue_lock);
1704}
1705
1706/* Called from recovery when it knows that a node has
1707 left the cluster */
1708int dlm_lowcomms_close(int nodeid)
1709{
1710 struct connection *con;
36b71a8b 1711 struct dlm_node_addr *na;
b38bc9c2 1712 int idx;
fdda387f 1713
fdda387f 1714 log_print("closing connection to node %d", nodeid);
b38bc9c2 1715 idx = srcu_read_lock(&connections_srcu);
fdda387f
PC
1716 con = nodeid2con(nodeid, 0);
1717 if (con) {
063c4c99 1718 set_bit(CF_CLOSE, &con->flags);
0d737a8c 1719 close_connection(con, true, true, true);
fdda387f 1720 clean_one_writequeue(con);
53a5edaa
AA
1721 if (con->othercon)
1722 clean_one_writequeue(con->othercon);
fdda387f 1723 }
b38bc9c2 1724 srcu_read_unlock(&connections_srcu, idx);
36b71a8b
DT
1725
1726 spin_lock(&dlm_node_addrs_spin);
1727 na = find_node_addr(nodeid);
1728 if (na) {
1729 list_del(&na->list);
1730 while (na->addr_count--)
1731 kfree(na->addr[na->addr_count]);
1732 kfree(na);
1733 }
1734 spin_unlock(&dlm_node_addrs_spin);
1735
fdda387f 1736 return 0;
fdda387f
PC
1737}
1738
6ed7257b 1739/* Receive workqueue function */
1d6e8131 1740static void process_recv_sockets(struct work_struct *work)
fdda387f 1741{
1d6e8131
PC
1742 struct connection *con = container_of(work, struct connection, rwork);
1743 int err;
fdda387f 1744
1d6e8131
PC
1745 clear_bit(CF_READ_PENDING, &con->flags);
1746 do {
d11ccd45 1747 err = receive_from_sock(con);
1d6e8131 1748 } while (!err);
fdda387f
PC
1749}
1750
d11ccd45
AA
1751static void process_listen_recv_socket(struct work_struct *work)
1752{
1753 accept_from_sock(&listen_con);
1754}
1755
6ed7257b 1756/* Send workqueue function */
1d6e8131 1757static void process_send_sockets(struct work_struct *work)
fdda387f 1758{
1d6e8131 1759 struct connection *con = container_of(work, struct connection, swork);
fdda387f 1760
7443bc96
AA
1761 WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags));
1762
8a4abb08 1763 clear_bit(CF_WRITE_PENDING, &con->flags);
ba868d9d 1764
489d8e55 1765 if (test_and_clear_bit(CF_RECONNECT, &con->flags)) {
ba868d9d 1766 close_connection(con, false, false, true);
489d8e55
AA
1767 dlm_midcomms_unack_msg_resend(con->nodeid);
1768 }
ba868d9d
AA
1769
1770 if (con->sock == NULL) { /* not mutex protected so check it inside too */
1771 if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
1772 msleep(1000);
6ed7257b 1773 con->connect_action(con);
ba868d9d 1774 }
01da24d3 1775 if (!list_empty(&con->writequeue))
063c4c99 1776 send_to_sock(con);
fdda387f
PC
1777}
1778
1d6e8131 1779static void work_stop(void)
fdda387f 1780{
b355516f
DW
1781 if (recv_workqueue)
1782 destroy_workqueue(recv_workqueue);
1783 if (send_workqueue)
1784 destroy_workqueue(send_workqueue);
fdda387f
PC
1785}
1786
1d6e8131 1787static int work_start(void)
fdda387f 1788{
e43f055a
DT
1789 recv_workqueue = alloc_workqueue("dlm_recv",
1790 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
b9d41052
NK
1791 if (!recv_workqueue) {
1792 log_print("can't start dlm_recv");
1793 return -ENOMEM;
fdda387f 1794 }
fdda387f 1795
e43f055a
DT
1796 send_workqueue = alloc_workqueue("dlm_send",
1797 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
b9d41052
NK
1798 if (!send_workqueue) {
1799 log_print("can't start dlm_send");
1d6e8131 1800 destroy_workqueue(recv_workqueue);
b9d41052 1801 return -ENOMEM;
fdda387f 1802 }
fdda387f
PC
1803
1804 return 0;
1805}
1806
9d232469
AA
1807static void shutdown_conn(struct connection *con)
1808{
1809 if (con->shutdown_action)
1810 con->shutdown_action(con);
1811}
1812
1813void dlm_lowcomms_shutdown(void)
1814{
b38bc9c2
AA
1815 int idx;
1816
9d232469
AA
1817 /* Set all the flags to prevent any
1818 * socket activity.
1819 */
1820 dlm_allow_conn = 0;
1821
1822 if (recv_workqueue)
1823 flush_workqueue(recv_workqueue);
1824 if (send_workqueue)
1825 flush_workqueue(send_workqueue);
1826
1827 dlm_close_sock(&listen_con.sock);
1828
b38bc9c2 1829 idx = srcu_read_lock(&connections_srcu);
9d232469 1830 foreach_conn(shutdown_conn);
b38bc9c2 1831 srcu_read_unlock(&connections_srcu, idx);
9d232469
AA
1832}
1833
f0fb83cb 1834static void _stop_conn(struct connection *con, bool and_other)
fdda387f 1835{
f0fb83cb 1836 mutex_lock(&con->sock_mutex);
173a31fe 1837 set_bit(CF_CLOSE, &con->flags);
f0fb83cb 1838 set_bit(CF_READ_PENDING, &con->flags);
8a4abb08 1839 set_bit(CF_WRITE_PENDING, &con->flags);
93eaadeb 1840 if (con->sock && con->sock->sk) {
1841 write_lock_bh(&con->sock->sk->sk_callback_lock);
5e9ccc37 1842 con->sock->sk->sk_user_data = NULL;
93eaadeb 1843 write_unlock_bh(&con->sock->sk->sk_callback_lock);
1844 }
f0fb83cb 1845 if (con->othercon && and_other)
1846 _stop_conn(con->othercon, false);
1847 mutex_unlock(&con->sock_mutex);
1848}
1849
1850static void stop_conn(struct connection *con)
1851{
1852 _stop_conn(con, true);
5e9ccc37 1853}
fdda387f 1854
4798cbbf
AA
1855static void connection_release(struct rcu_head *rcu)
1856{
1857 struct connection *con = container_of(rcu, struct connection, rcu);
1858
1859 kfree(con->rx_buf);
1860 kfree(con);
1861}
1862
5e9ccc37
CC
1863static void free_conn(struct connection *con)
1864{
0d737a8c 1865 close_connection(con, true, true, true);
a47666eb
AA
1866 spin_lock(&connections_lock);
1867 hlist_del_rcu(&con->list);
1868 spin_unlock(&connections_lock);
948c47e9
AA
1869 if (con->othercon) {
1870 clean_one_writequeue(con->othercon);
5cbec208
AA
1871 call_srcu(&connections_srcu, &con->othercon->rcu,
1872 connection_release);
948c47e9 1873 }
0de98432 1874 clean_one_writequeue(con);
5cbec208 1875 call_srcu(&connections_srcu, &con->rcu, connection_release);
5e9ccc37
CC
1876}
1877
f0fb83cb 1878static void work_flush(void)
1879{
b38bc9c2 1880 int ok;
f0fb83cb 1881 int i;
f0fb83cb 1882 struct connection *con;
1883
f0fb83cb 1884 do {
1885 ok = 1;
1886 foreach_conn(stop_conn);
b355516f
DW
1887 if (recv_workqueue)
1888 flush_workqueue(recv_workqueue);
1889 if (send_workqueue)
1890 flush_workqueue(send_workqueue);
f0fb83cb 1891 for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
a47666eb
AA
1892 hlist_for_each_entry_rcu(con, &connection_hash[i],
1893 list) {
f0fb83cb 1894 ok &= test_bit(CF_READ_PENDING, &con->flags);
8a4abb08 1895 ok &= test_bit(CF_WRITE_PENDING, &con->flags);
1896 if (con->othercon) {
f0fb83cb 1897 ok &= test_bit(CF_READ_PENDING,
1898 &con->othercon->flags);
8a4abb08 1899 ok &= test_bit(CF_WRITE_PENDING,
1900 &con->othercon->flags);
1901 }
f0fb83cb 1902 }
1903 }
1904 } while (!ok);
1905}
1906
5e9ccc37
CC
1907void dlm_lowcomms_stop(void)
1908{
b38bc9c2
AA
1909 int idx;
1910
1911 idx = srcu_read_lock(&connections_srcu);
f0fb83cb 1912 work_flush();
3a8db798 1913 foreach_conn(free_conn);
b38bc9c2 1914 srcu_read_unlock(&connections_srcu, idx);
1d6e8131 1915 work_stop();
043697f0 1916 deinit_local();
fdda387f
PC
1917}
1918
fdda387f
PC
1919int dlm_lowcomms_start(void)
1920{
6ed7257b 1921 int error = -EINVAL;
5e9ccc37
CC
1922 int i;
1923
1924 for (i = 0; i < CONN_HASH_SIZE; i++)
1925 INIT_HLIST_HEAD(&connection_hash[i]);
fdda387f 1926
6ed7257b
PC
1927 init_local();
1928 if (!dlm_local_count) {
617e82e1 1929 error = -ENOTCONN;
fdda387f 1930 log_print("no local IP address has been set");
513ef596 1931 goto fail;
fdda387f
PC
1932 }
1933
d11ccd45
AA
1934 INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
1935
513ef596
DT
1936 error = work_start();
1937 if (error)
a47666eb 1938 goto fail;
513ef596
DT
1939
1940 dlm_allow_conn = 1;
fdda387f 1941
fdda387f 1942 /* Start listening */
6ed7257b
PC
1943 if (dlm_config.ci_protocol == 0)
1944 error = tcp_listen_for_all();
1945 else
d11ccd45 1946 error = sctp_listen_for_all(&listen_con);
fdda387f
PC
1947 if (error)
1948 goto fail_unlisten;
1949
fdda387f
PC
1950 return 0;
1951
ac33d071 1952fail_unlisten:
513ef596 1953 dlm_allow_conn = 0;
d11ccd45 1954 dlm_close_sock(&listen_con.sock);
513ef596 1955fail:
fdda387f
PC
1956 return error;
1957}
36b71a8b
DT
1958
1959void dlm_lowcomms_exit(void)
1960{
1961 struct dlm_node_addr *na, *safe;
1962
1963 spin_lock(&dlm_node_addrs_spin);
1964 list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1965 list_del(&na->list);
1966 while (na->addr_count--)
1967 kfree(na->addr[na->addr_count]);
1968 kfree(na);
1969 }
1970 spin_unlock(&dlm_node_addrs_spin);
1971}