1 // SPDX-License-Identifier: GPL-2.0-or-later
5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
13 #include <linux/module.h>
14 #include <linux/rculist.h>
15 #include <linux/random.h>
19 #include "rtrs-clt-trace.h"
21 #define RTRS_CONNECT_TIMEOUT_MS 30000
23 * Wait a bit before trying to reconnect after a failure
24 * in order to give server time to finish clean up which
25 * leads to "false positives" failed reconnect attempts
27 #define RTRS_RECONNECT_BACKOFF 1000
29 * Wait for additional random time between 0 and 8 seconds
30 * before starting to reconnect to avoid clients reconnecting
31 * all at once in case of a major network outage
33 #define RTRS_RECONNECT_SEED 8
35 #define FIRST_CONN 0x01
36 /* limit to 128 * 4k = 512k max IO */
37 #define RTRS_MAX_SEGMENTS 128
39 MODULE_DESCRIPTION("RDMA Transport Client");
40 MODULE_LICENSE("GPL");
42 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops
;
43 static struct rtrs_rdma_dev_pd dev_pd
= {
47 static struct workqueue_struct
*rtrs_wq
;
48 static const struct class rtrs_clt_dev_class
= {
49 .name
= "rtrs-client",
52 static inline bool rtrs_clt_is_connected(const struct rtrs_clt_sess
*clt
)
54 struct rtrs_clt_path
*clt_path
;
55 bool connected
= false;
58 list_for_each_entry_rcu(clt_path
, &clt
->paths_list
, s
.entry
)
59 if (READ_ONCE(clt_path
->state
) == RTRS_CLT_CONNECTED
) {
68 static struct rtrs_permit
*
69 __rtrs_get_permit(struct rtrs_clt_sess
*clt
, enum rtrs_clt_con_type con_type
)
71 size_t max_depth
= clt
->queue_depth
;
72 struct rtrs_permit
*permit
;
76 * Adapted from null_blk get_tag(). Callers from different cpus may
77 * grab the same bit, since find_first_zero_bit is not atomic.
78 * But then the test_and_set_bit_lock will fail for all the
79 * callers but one, so that they will loop again.
80 * This way an explicit spinlock is not required.
83 bit
= find_first_zero_bit(clt
->permits_map
, max_depth
);
86 } while (test_and_set_bit_lock(bit
, clt
->permits_map
));
88 permit
= get_permit(clt
, bit
);
89 WARN_ON(permit
->mem_id
!= bit
);
90 permit
->cpu_id
= raw_smp_processor_id();
91 permit
->con_type
= con_type
;
96 static inline void __rtrs_put_permit(struct rtrs_clt_sess
*clt
,
97 struct rtrs_permit
*permit
)
99 clear_bit_unlock(permit
->mem_id
, clt
->permits_map
);
103 * rtrs_clt_get_permit() - allocates permit for future RDMA operation
104 * @clt: Current session
105 * @con_type: Type of connection to use with the permit
106 * @can_wait: Wait type
109 * Allocates permit for the following RDMA operation. Permit is used
110 * to preallocate all resources and to propagate memory pressure
114 * Can sleep if @wait == RTRS_PERMIT_WAIT
116 struct rtrs_permit
*rtrs_clt_get_permit(struct rtrs_clt_sess
*clt
,
117 enum rtrs_clt_con_type con_type
,
118 enum wait_type can_wait
)
120 struct rtrs_permit
*permit
;
123 permit
= __rtrs_get_permit(clt
, con_type
);
124 if (permit
|| !can_wait
)
128 prepare_to_wait(&clt
->permits_wait
, &wait
,
129 TASK_UNINTERRUPTIBLE
);
130 permit
= __rtrs_get_permit(clt
, con_type
);
137 finish_wait(&clt
->permits_wait
, &wait
);
141 EXPORT_SYMBOL(rtrs_clt_get_permit
);
144 * rtrs_clt_put_permit() - puts allocated permit
145 * @clt: Current session
146 * @permit: Permit to be freed
151 void rtrs_clt_put_permit(struct rtrs_clt_sess
*clt
,
152 struct rtrs_permit
*permit
)
154 if (WARN_ON(!test_bit(permit
->mem_id
, clt
->permits_map
)))
157 __rtrs_put_permit(clt
, permit
);
160 * rtrs_clt_get_permit() adds itself to the &clt->permits_wait list
161 * before calling schedule(). So if rtrs_clt_get_permit() is sleeping
162 * it must have added itself to &clt->permits_wait before
163 * __rtrs_put_permit() finished.
164 * Hence it is safe to guard wake_up() with a waitqueue_active() test.
166 if (waitqueue_active(&clt
->permits_wait
))
167 wake_up(&clt
->permits_wait
);
169 EXPORT_SYMBOL(rtrs_clt_put_permit
);
172 * rtrs_permit_to_clt_con() - returns RDMA connection pointer by the permit
173 * @clt_path: client path pointer
174 * @permit: permit for the allocation of the RDMA buffer
176 * IO connection starts from 1.
177 * 0 connection is for user messages.
180 struct rtrs_clt_con
*rtrs_permit_to_clt_con(struct rtrs_clt_path
*clt_path
,
181 struct rtrs_permit
*permit
)
185 if (permit
->con_type
== RTRS_IO_CON
)
186 id
= (permit
->cpu_id
% (clt_path
->s
.irq_con_num
- 1)) + 1;
188 return to_clt_con(clt_path
->s
.con
[id
]);
192 * rtrs_clt_change_state() - change the session state through session state
195 * @clt_path: client path to change the state of.
196 * @new_state: state to change to.
198 * returns true if sess's state is changed to new state, otherwise return false.
201 * state_wq lock must be hold.
203 static bool rtrs_clt_change_state(struct rtrs_clt_path
*clt_path
,
204 enum rtrs_clt_state new_state
)
206 enum rtrs_clt_state old_state
;
207 bool changed
= false;
209 lockdep_assert_held(&clt_path
->state_wq
.lock
);
211 old_state
= clt_path
->state
;
213 case RTRS_CLT_CONNECTING
:
215 case RTRS_CLT_RECONNECTING
:
222 case RTRS_CLT_RECONNECTING
:
224 case RTRS_CLT_CONNECTED
:
225 case RTRS_CLT_CONNECTING_ERR
:
226 case RTRS_CLT_CLOSED
:
233 case RTRS_CLT_CONNECTED
:
235 case RTRS_CLT_CONNECTING
:
242 case RTRS_CLT_CONNECTING_ERR
:
244 case RTRS_CLT_CONNECTING
:
251 case RTRS_CLT_CLOSING
:
253 case RTRS_CLT_CONNECTING
:
254 case RTRS_CLT_CONNECTING_ERR
:
255 case RTRS_CLT_RECONNECTING
:
256 case RTRS_CLT_CONNECTED
:
263 case RTRS_CLT_CLOSED
:
265 case RTRS_CLT_CLOSING
:
274 case RTRS_CLT_CLOSED
:
285 clt_path
->state
= new_state
;
286 wake_up_locked(&clt_path
->state_wq
);
292 static bool rtrs_clt_change_state_from_to(struct rtrs_clt_path
*clt_path
,
293 enum rtrs_clt_state old_state
,
294 enum rtrs_clt_state new_state
)
296 bool changed
= false;
298 spin_lock_irq(&clt_path
->state_wq
.lock
);
299 if (clt_path
->state
== old_state
)
300 changed
= rtrs_clt_change_state(clt_path
, new_state
);
301 spin_unlock_irq(&clt_path
->state_wq
.lock
);
306 static void rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_path
*clt_path
);
307 static void rtrs_rdma_error_recovery(struct rtrs_clt_con
*con
)
309 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
311 trace_rtrs_rdma_error_recovery(clt_path
);
313 if (rtrs_clt_change_state_from_to(clt_path
,
315 RTRS_CLT_RECONNECTING
)) {
316 queue_work(rtrs_wq
, &clt_path
->err_recovery_work
);
319 * Error can happen just on establishing new connection,
320 * so notify waiter with error state, waiter is responsible
321 * for cleaning the rest and reconnect if needed.
323 rtrs_clt_change_state_from_to(clt_path
,
325 RTRS_CLT_CONNECTING_ERR
);
329 static void rtrs_clt_fast_reg_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
331 struct rtrs_clt_con
*con
= to_clt_con(wc
->qp
->qp_context
);
333 if (wc
->status
!= IB_WC_SUCCESS
) {
334 rtrs_err(con
->c
.path
, "Failed IB_WR_REG_MR: %s\n",
335 ib_wc_status_msg(wc
->status
));
336 rtrs_rdma_error_recovery(con
);
340 static struct ib_cqe fast_reg_cqe
= {
341 .done
= rtrs_clt_fast_reg_done
344 static void complete_rdma_req(struct rtrs_clt_io_req
*req
, int errno
,
345 bool notify
, bool can_wait
);
347 static void rtrs_clt_inv_rkey_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
349 struct rtrs_clt_io_req
*req
=
350 container_of(wc
->wr_cqe
, typeof(*req
), inv_cqe
);
351 struct rtrs_clt_con
*con
= to_clt_con(wc
->qp
->qp_context
);
353 if (wc
->status
!= IB_WC_SUCCESS
) {
354 rtrs_err(con
->c
.path
, "Failed IB_WR_LOCAL_INV: %s\n",
355 ib_wc_status_msg(wc
->status
));
356 rtrs_rdma_error_recovery(con
);
358 req
->need_inv
= false;
359 if (req
->need_inv_comp
)
360 complete(&req
->inv_comp
);
362 /* Complete request from INV callback */
363 complete_rdma_req(req
, req
->inv_errno
, true, false);
366 static int rtrs_inv_rkey(struct rtrs_clt_io_req
*req
)
368 struct rtrs_clt_con
*con
= req
->con
;
369 struct ib_send_wr wr
= {
370 .opcode
= IB_WR_LOCAL_INV
,
371 .wr_cqe
= &req
->inv_cqe
,
372 .send_flags
= IB_SEND_SIGNALED
,
373 .ex
.invalidate_rkey
= req
->mr
->rkey
,
375 req
->inv_cqe
.done
= rtrs_clt_inv_rkey_done
;
377 return ib_post_send(con
->c
.qp
, &wr
, NULL
);
380 static void complete_rdma_req(struct rtrs_clt_io_req
*req
, int errno
,
381 bool notify
, bool can_wait
)
383 struct rtrs_clt_con
*con
= req
->con
;
384 struct rtrs_clt_path
*clt_path
;
389 if (WARN_ON(!req
->con
))
391 clt_path
= to_clt_path(con
->c
.path
);
394 if (req
->dir
== DMA_FROM_DEVICE
&& req
->need_inv
) {
396 * We are here to invalidate read requests
397 * ourselves. In normal scenario server should
398 * send INV for all read requests, but
399 * we are here, thus two things could happen:
401 * 1. this is failover, when errno != 0
404 * 2. something totally bad happened and
405 * server forgot to send INV, so we
406 * should do that ourselves.
410 req
->need_inv_comp
= true;
412 /* This should be IO path, so always notify */
414 /* Save errno for INV callback */
415 req
->inv_errno
= errno
;
418 refcount_inc(&req
->ref
);
419 err
= rtrs_inv_rkey(req
);
421 rtrs_err(con
->c
.path
, "Send INV WR key=%#x: %d\n",
423 } else if (can_wait
) {
424 wait_for_completion(&req
->inv_comp
);
427 * Something went wrong, so request will be
428 * completed from INV callback.
434 if (!refcount_dec_and_test(&req
->ref
))
437 ib_dma_unmap_sg(clt_path
->s
.dev
->ib_dev
, req
->sglist
,
438 req
->sg_cnt
, req
->dir
);
440 if (!refcount_dec_and_test(&req
->ref
))
442 if (req
->mp_policy
== MP_POLICY_MIN_INFLIGHT
)
443 atomic_dec(&clt_path
->stats
->inflight
);
449 rtrs_err_rl(con
->c
.path
, "IO request failed: error=%d path=%s [%s:%u] notify=%d\n",
450 errno
, kobject_name(&clt_path
->kobj
), clt_path
->hca_name
,
451 clt_path
->hca_port
, notify
);
455 req
->conf(req
->priv
, errno
);
458 static int rtrs_post_send_rdma(struct rtrs_clt_con
*con
,
459 struct rtrs_clt_io_req
*req
,
460 struct rtrs_rbuf
*rbuf
, u32 off
,
461 u32 imm
, struct ib_send_wr
*wr
)
463 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
464 enum ib_send_flags flags
;
468 rtrs_wrn(con
->c
.path
,
469 "Doing RDMA Write failed, no data supplied\n");
473 /* user data and user message in the first list element */
474 sge
.addr
= req
->iu
->dma_addr
;
475 sge
.length
= req
->sg_size
;
476 sge
.lkey
= clt_path
->s
.dev
->ib_pd
->local_dma_lkey
;
479 * From time to time we have to post signalled sends,
480 * or send queue will fill up and only QP reset can help.
482 flags
= atomic_inc_return(&con
->c
.wr_cnt
) % clt_path
->s
.signal_interval
?
483 0 : IB_SEND_SIGNALED
;
485 ib_dma_sync_single_for_device(clt_path
->s
.dev
->ib_dev
,
487 req
->sg_size
, DMA_TO_DEVICE
);
489 return rtrs_iu_post_rdma_write_imm(&con
->c
, req
->iu
, &sge
, 1,
490 rbuf
->rkey
, rbuf
->addr
+ off
,
491 imm
, flags
, wr
, NULL
);
494 static void process_io_rsp(struct rtrs_clt_path
*clt_path
, u32 msg_id
,
495 s16 errno
, bool w_inval
)
497 struct rtrs_clt_io_req
*req
;
499 if (WARN_ON(msg_id
>= clt_path
->queue_depth
))
502 req
= &clt_path
->reqs
[msg_id
];
503 /* Drop need_inv if server responded with send with invalidation */
504 req
->need_inv
&= !w_inval
;
505 complete_rdma_req(req
, errno
, true, false);
508 static void rtrs_clt_recv_done(struct rtrs_clt_con
*con
, struct ib_wc
*wc
)
512 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
514 WARN_ON((clt_path
->flags
& RTRS_MSG_NEW_RKEY_F
) == 0);
515 iu
= container_of(wc
->wr_cqe
, struct rtrs_iu
,
517 err
= rtrs_iu_post_recv(&con
->c
, iu
);
519 rtrs_err(con
->c
.path
, "post iu failed %d\n", err
);
520 rtrs_rdma_error_recovery(con
);
524 static void rtrs_clt_rkey_rsp_done(struct rtrs_clt_con
*con
, struct ib_wc
*wc
)
526 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
527 struct rtrs_msg_rkey_rsp
*msg
;
528 u32 imm_type
, imm_payload
;
529 bool w_inval
= false;
534 WARN_ON((clt_path
->flags
& RTRS_MSG_NEW_RKEY_F
) == 0);
536 iu
= container_of(wc
->wr_cqe
, struct rtrs_iu
, cqe
);
538 if (wc
->byte_len
< sizeof(*msg
)) {
539 rtrs_err(con
->c
.path
, "rkey response is malformed: size %d\n",
543 ib_dma_sync_single_for_cpu(clt_path
->s
.dev
->ib_dev
, iu
->dma_addr
,
544 iu
->size
, DMA_FROM_DEVICE
);
546 if (le16_to_cpu(msg
->type
) != RTRS_MSG_RKEY_RSP
) {
547 rtrs_err(clt_path
->clt
,
548 "rkey response is malformed: type %d\n",
549 le16_to_cpu(msg
->type
));
552 buf_id
= le16_to_cpu(msg
->buf_id
);
553 if (WARN_ON(buf_id
>= clt_path
->queue_depth
))
556 rtrs_from_imm(be32_to_cpu(wc
->ex
.imm_data
), &imm_type
, &imm_payload
);
557 if (imm_type
== RTRS_IO_RSP_IMM
||
558 imm_type
== RTRS_IO_RSP_W_INV_IMM
) {
561 w_inval
= (imm_type
== RTRS_IO_RSP_W_INV_IMM
);
562 rtrs_from_io_rsp_imm(imm_payload
, &msg_id
, &err
);
564 if (WARN_ON(buf_id
!= msg_id
))
566 clt_path
->rbufs
[buf_id
].rkey
= le32_to_cpu(msg
->rkey
);
567 process_io_rsp(clt_path
, msg_id
, err
, w_inval
);
569 ib_dma_sync_single_for_device(clt_path
->s
.dev
->ib_dev
, iu
->dma_addr
,
570 iu
->size
, DMA_FROM_DEVICE
);
571 return rtrs_clt_recv_done(con
, wc
);
573 rtrs_rdma_error_recovery(con
);
576 static void rtrs_clt_rdma_done(struct ib_cq
*cq
, struct ib_wc
*wc
);
578 static struct ib_cqe io_comp_cqe
= {
579 .done
= rtrs_clt_rdma_done
583 * Post x2 empty WRs: first is for this RDMA with IMM,
584 * second is for RECV with INV, which happened earlier.
586 static int rtrs_post_recv_empty_x2(struct rtrs_con
*con
, struct ib_cqe
*cqe
)
588 struct ib_recv_wr wr_arr
[2], *wr
;
591 memset(wr_arr
, 0, sizeof(wr_arr
));
592 for (i
= 0; i
< ARRAY_SIZE(wr_arr
); i
++) {
596 /* Chain backwards */
597 wr
->next
= &wr_arr
[i
- 1];
600 return ib_post_recv(con
->qp
, wr
, NULL
);
603 static void rtrs_clt_rdma_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
605 struct rtrs_clt_con
*con
= to_clt_con(wc
->qp
->qp_context
);
606 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
607 u32 imm_type
, imm_payload
;
608 bool w_inval
= false;
611 if (wc
->status
!= IB_WC_SUCCESS
) {
612 if (wc
->status
!= IB_WC_WR_FLUSH_ERR
) {
613 rtrs_err(clt_path
->clt
, "RDMA failed: %s\n",
614 ib_wc_status_msg(wc
->status
));
615 rtrs_rdma_error_recovery(con
);
619 rtrs_clt_update_wc_stats(con
);
621 switch (wc
->opcode
) {
622 case IB_WC_RECV_RDMA_WITH_IMM
:
624 * post_recv() RDMA write completions of IO reqs (read/write)
627 if (WARN_ON(wc
->wr_cqe
->done
!= rtrs_clt_rdma_done
))
629 rtrs_from_imm(be32_to_cpu(wc
->ex
.imm_data
),
630 &imm_type
, &imm_payload
);
631 if (imm_type
== RTRS_IO_RSP_IMM
||
632 imm_type
== RTRS_IO_RSP_W_INV_IMM
) {
635 w_inval
= (imm_type
== RTRS_IO_RSP_W_INV_IMM
);
636 rtrs_from_io_rsp_imm(imm_payload
, &msg_id
, &err
);
638 process_io_rsp(clt_path
, msg_id
, err
, w_inval
);
639 } else if (imm_type
== RTRS_HB_MSG_IMM
) {
641 rtrs_send_hb_ack(&clt_path
->s
);
642 if (clt_path
->flags
& RTRS_MSG_NEW_RKEY_F
)
643 return rtrs_clt_recv_done(con
, wc
);
644 } else if (imm_type
== RTRS_HB_ACK_IMM
) {
646 clt_path
->s
.hb_missed_cnt
= 0;
647 clt_path
->s
.hb_cur_latency
=
648 ktime_sub(ktime_get(), clt_path
->s
.hb_last_sent
);
649 if (clt_path
->flags
& RTRS_MSG_NEW_RKEY_F
)
650 return rtrs_clt_recv_done(con
, wc
);
652 rtrs_wrn(con
->c
.path
, "Unknown IMM type %u\n",
657 * Post x2 empty WRs: first is for this RDMA with IMM,
658 * second is for RECV with INV, which happened earlier.
660 err
= rtrs_post_recv_empty_x2(&con
->c
, &io_comp_cqe
);
662 err
= rtrs_post_recv_empty(&con
->c
, &io_comp_cqe
);
664 rtrs_err(con
->c
.path
, "rtrs_post_recv_empty(): %d\n",
666 rtrs_rdma_error_recovery(con
);
671 * Key invalidations from server side
673 WARN_ON(!(wc
->wc_flags
& IB_WC_WITH_INVALIDATE
||
674 wc
->wc_flags
& IB_WC_WITH_IMM
));
675 WARN_ON(wc
->wr_cqe
->done
!= rtrs_clt_rdma_done
);
676 if (clt_path
->flags
& RTRS_MSG_NEW_RKEY_F
) {
677 if (wc
->wc_flags
& IB_WC_WITH_INVALIDATE
)
678 return rtrs_clt_recv_done(con
, wc
);
680 return rtrs_clt_rkey_rsp_done(con
, wc
);
683 case IB_WC_RDMA_WRITE
:
685 * post_send() RDMA write completions of IO reqs (read/write)
691 rtrs_wrn(clt_path
->clt
, "Unexpected WC type: %d\n", wc
->opcode
);
696 static int post_recv_io(struct rtrs_clt_con
*con
, size_t q_size
)
699 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
701 for (i
= 0; i
< q_size
; i
++) {
702 if (clt_path
->flags
& RTRS_MSG_NEW_RKEY_F
) {
703 struct rtrs_iu
*iu
= &con
->rsp_ius
[i
];
705 err
= rtrs_iu_post_recv(&con
->c
, iu
);
707 err
= rtrs_post_recv_empty(&con
->c
, &io_comp_cqe
);
716 static int post_recv_path(struct rtrs_clt_path
*clt_path
)
721 for (cid
= 0; cid
< clt_path
->s
.con_num
; cid
++) {
723 q_size
= SERVICE_CON_QUEUE_DEPTH
;
725 q_size
= clt_path
->queue_depth
;
728 * x2 for RDMA read responses + FR key invalidations,
729 * RDMA writes do not require any FR registrations.
733 err
= post_recv_io(to_clt_con(clt_path
->s
.con
[cid
]), q_size
);
735 rtrs_err(clt_path
->clt
, "post_recv_io(), err: %d\n",
746 struct list_head skip_list
;
747 struct rtrs_clt_sess
*clt
;
748 struct rtrs_clt_path
*(*next_path
)(struct path_it
*it
);
752 * rtrs_clt_get_next_path_or_null - get clt path from the list or return NULL
753 * @head: the head for the list.
754 * @clt_path: The element to take the next clt_path from.
756 * Next clt path returned in round-robin fashion, i.e. head will be skipped,
757 * but if list is observed as empty, NULL will be returned.
759 * This function may safely run concurrently with the _rcu list-mutation
760 * primitives such as list_add_rcu() as long as it's guarded by rcu_read_lock().
762 static inline struct rtrs_clt_path
*
763 rtrs_clt_get_next_path_or_null(struct list_head
*head
, struct rtrs_clt_path
*clt_path
)
765 return list_next_or_null_rcu(head
, &clt_path
->s
.entry
, typeof(*clt_path
), s
.entry
) ?:
766 list_next_or_null_rcu(head
,
767 READ_ONCE((&clt_path
->s
.entry
)->next
),
768 typeof(*clt_path
), s
.entry
);
772 * get_next_path_rr() - Returns path in round-robin fashion.
773 * @it: the path pointer
775 * Related to @MP_POLICY_RR
778 * rcu_read_lock() must be held.
780 static struct rtrs_clt_path
*get_next_path_rr(struct path_it
*it
)
782 struct rtrs_clt_path __rcu
**ppcpu_path
;
783 struct rtrs_clt_path
*path
;
784 struct rtrs_clt_sess
*clt
;
787 * Assert that rcu lock must be held
789 RCU_LOCKDEP_WARN(!rcu_read_lock_held(), "no rcu read lock held");
794 * Here we use two RCU objects: @paths_list and @pcpu_path
795 * pointer. See rtrs_clt_remove_path_from_arr() for details
796 * how that is handled.
799 ppcpu_path
= this_cpu_ptr(clt
->pcpu_path
);
800 path
= rcu_dereference(*ppcpu_path
);
802 path
= list_first_or_null_rcu(&clt
->paths_list
,
803 typeof(*path
), s
.entry
);
805 path
= rtrs_clt_get_next_path_or_null(&clt
->paths_list
, path
);
807 rcu_assign_pointer(*ppcpu_path
, path
);
813 * get_next_path_min_inflight() - Returns path with minimal inflight count.
814 * @it: the path pointer
816 * Related to @MP_POLICY_MIN_INFLIGHT
819 * rcu_read_lock() must be hold.
821 static struct rtrs_clt_path
*get_next_path_min_inflight(struct path_it
*it
)
823 struct rtrs_clt_path
*min_path
= NULL
;
824 struct rtrs_clt_sess
*clt
= it
->clt
;
825 struct rtrs_clt_path
*clt_path
;
826 int min_inflight
= INT_MAX
;
829 list_for_each_entry_rcu(clt_path
, &clt
->paths_list
, s
.entry
) {
830 if (READ_ONCE(clt_path
->state
) != RTRS_CLT_CONNECTED
)
833 if (!list_empty(raw_cpu_ptr(clt_path
->mp_skip_entry
)))
836 inflight
= atomic_read(&clt_path
->stats
->inflight
);
838 if (inflight
< min_inflight
) {
839 min_inflight
= inflight
;
845 * add the path to the skip list, so that next time we can get
849 list_add(raw_cpu_ptr(min_path
->mp_skip_entry
), &it
->skip_list
);
855 * get_next_path_min_latency() - Returns path with minimal latency.
856 * @it: the path pointer
858 * Return: a path with the lowest latency or NULL if all paths are tried
861 * rcu_read_lock() must be hold.
863 * Related to @MP_POLICY_MIN_LATENCY
865 * This DOES skip an already-tried path.
866 * There is a skip-list to skip a path if the path has tried but failed.
867 * It will try the minimum latency path and then the second minimum latency
868 * path and so on. Finally it will return NULL if all paths are tried.
869 * Therefore the caller MUST check the returned
870 * path is NULL and trigger the IO error.
872 static struct rtrs_clt_path
*get_next_path_min_latency(struct path_it
*it
)
874 struct rtrs_clt_path
*min_path
= NULL
;
875 struct rtrs_clt_sess
*clt
= it
->clt
;
876 struct rtrs_clt_path
*clt_path
;
877 ktime_t min_latency
= KTIME_MAX
;
880 list_for_each_entry_rcu(clt_path
, &clt
->paths_list
, s
.entry
) {
881 if (READ_ONCE(clt_path
->state
) != RTRS_CLT_CONNECTED
)
884 if (!list_empty(raw_cpu_ptr(clt_path
->mp_skip_entry
)))
887 latency
= clt_path
->s
.hb_cur_latency
;
889 if (latency
< min_latency
) {
890 min_latency
= latency
;
896 * add the path to the skip list, so that next time we can get
900 list_add(raw_cpu_ptr(min_path
->mp_skip_entry
), &it
->skip_list
);
905 static inline void path_it_init(struct path_it
*it
, struct rtrs_clt_sess
*clt
)
907 INIT_LIST_HEAD(&it
->skip_list
);
911 if (clt
->mp_policy
== MP_POLICY_RR
)
912 it
->next_path
= get_next_path_rr
;
913 else if (clt
->mp_policy
== MP_POLICY_MIN_INFLIGHT
)
914 it
->next_path
= get_next_path_min_inflight
;
916 it
->next_path
= get_next_path_min_latency
;
919 static inline void path_it_deinit(struct path_it
*it
)
921 struct list_head
*skip
, *tmp
;
923 * The skip_list is used only for the MIN_INFLIGHT and MIN_LATENCY policies.
924 * We need to remove paths from it, so that next IO can insert
925 * paths (->mp_skip_entry) into a skip_list again.
927 list_for_each_safe(skip
, tmp
, &it
->skip_list
)
932 * rtrs_clt_init_req() - Initialize an rtrs_clt_io_req holding information
933 * about an inflight IO.
934 * The user buffer holding user control message (not data) is copied into
935 * the corresponding buffer of rtrs_iu (req->iu->buf), which later on will
936 * also hold the control message of rtrs.
937 * @req: an io request holding information about IO.
938 * @clt_path: client path
939 * @conf: conformation callback function to notify upper layer.
940 * @permit: permit for allocation of RDMA remote buffer
941 * @priv: private pointer
942 * @vec: kernel vector containing control message
943 * @usr_len: length of the user message
944 * @sg: scater list for IO data
945 * @sg_cnt: number of scater list entries
946 * @data_len: length of the IO data
947 * @dir: direction of the IO.
949 static void rtrs_clt_init_req(struct rtrs_clt_io_req
*req
,
950 struct rtrs_clt_path
*clt_path
,
951 void (*conf
)(void *priv
, int errno
),
952 struct rtrs_permit
*permit
, void *priv
,
953 const struct kvec
*vec
, size_t usr_len
,
954 struct scatterlist
*sg
, size_t sg_cnt
,
955 size_t data_len
, int dir
)
957 struct iov_iter iter
;
960 req
->permit
= permit
;
962 req
->usr_len
= usr_len
;
963 req
->data_len
= data_len
;
965 req
->sg_cnt
= sg_cnt
;
968 req
->con
= rtrs_permit_to_clt_con(clt_path
, permit
);
970 req
->need_inv
= false;
971 req
->need_inv_comp
= false;
973 refcount_set(&req
->ref
, 1);
974 req
->mp_policy
= clt_path
->clt
->mp_policy
;
976 iov_iter_kvec(&iter
, ITER_SOURCE
, vec
, 1, usr_len
);
977 len
= _copy_from_iter(req
->iu
->buf
, usr_len
, &iter
);
978 WARN_ON(len
!= usr_len
);
980 reinit_completion(&req
->inv_comp
);
983 static struct rtrs_clt_io_req
*
984 rtrs_clt_get_req(struct rtrs_clt_path
*clt_path
,
985 void (*conf
)(void *priv
, int errno
),
986 struct rtrs_permit
*permit
, void *priv
,
987 const struct kvec
*vec
, size_t usr_len
,
988 struct scatterlist
*sg
, size_t sg_cnt
,
989 size_t data_len
, int dir
)
991 struct rtrs_clt_io_req
*req
;
993 req
= &clt_path
->reqs
[permit
->mem_id
];
994 rtrs_clt_init_req(req
, clt_path
, conf
, permit
, priv
, vec
, usr_len
,
995 sg
, sg_cnt
, data_len
, dir
);
999 static struct rtrs_clt_io_req
*
1000 rtrs_clt_get_copy_req(struct rtrs_clt_path
*alive_path
,
1001 struct rtrs_clt_io_req
*fail_req
)
1003 struct rtrs_clt_io_req
*req
;
1005 .iov_base
= fail_req
->iu
->buf
,
1006 .iov_len
= fail_req
->usr_len
1009 req
= &alive_path
->reqs
[fail_req
->permit
->mem_id
];
1010 rtrs_clt_init_req(req
, alive_path
, fail_req
->conf
, fail_req
->permit
,
1011 fail_req
->priv
, &vec
, fail_req
->usr_len
,
1012 fail_req
->sglist
, fail_req
->sg_cnt
,
1013 fail_req
->data_len
, fail_req
->dir
);
1017 static int rtrs_post_rdma_write_sg(struct rtrs_clt_con
*con
,
1018 struct rtrs_clt_io_req
*req
,
1019 struct rtrs_rbuf
*rbuf
, bool fr_en
,
1020 u32 count
, u32 size
, u32 imm
,
1021 struct ib_send_wr
*wr
,
1022 struct ib_send_wr
*tail
)
1024 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
1025 struct ib_sge
*sge
= req
->sge
;
1026 enum ib_send_flags flags
;
1027 struct scatterlist
*sg
;
1030 struct ib_send_wr
*ptail
= NULL
;
1034 sge
[i
].addr
= req
->mr
->iova
;
1035 sge
[i
].length
= req
->mr
->length
;
1036 sge
[i
].lkey
= req
->mr
->lkey
;
1041 for_each_sg(req
->sglist
, sg
, count
, i
) {
1042 sge
[i
].addr
= sg_dma_address(sg
);
1043 sge
[i
].length
= sg_dma_len(sg
);
1044 sge
[i
].lkey
= clt_path
->s
.dev
->ib_pd
->local_dma_lkey
;
1046 num_sge
= 1 + count
;
1048 sge
[i
].addr
= req
->iu
->dma_addr
;
1049 sge
[i
].length
= size
;
1050 sge
[i
].lkey
= clt_path
->s
.dev
->ib_pd
->local_dma_lkey
;
1053 * From time to time we have to post signalled sends,
1054 * or send queue will fill up and only QP reset can help.
1056 flags
= atomic_inc_return(&con
->c
.wr_cnt
) % clt_path
->s
.signal_interval
?
1057 0 : IB_SEND_SIGNALED
;
1059 ib_dma_sync_single_for_device(clt_path
->s
.dev
->ib_dev
,
1061 size
, DMA_TO_DEVICE
);
1063 return rtrs_iu_post_rdma_write_imm(&con
->c
, req
->iu
, sge
, num_sge
,
1064 rbuf
->rkey
, rbuf
->addr
, imm
,
1068 static int rtrs_map_sg_fr(struct rtrs_clt_io_req
*req
, size_t count
)
1072 /* Align the MR to a 4K page size to match the block virt boundary */
1073 nr
= ib_map_mr_sg(req
->mr
, req
->sglist
, count
, NULL
, SZ_4K
);
1075 return nr
< 0 ? nr
: -EINVAL
;
1076 ib_update_fast_reg_key(req
->mr
, ib_inc_rkey(req
->mr
->rkey
));
1081 static int rtrs_clt_write_req(struct rtrs_clt_io_req
*req
)
1083 struct rtrs_clt_con
*con
= req
->con
;
1084 struct rtrs_path
*s
= con
->c
.path
;
1085 struct rtrs_clt_path
*clt_path
= to_clt_path(s
);
1086 struct rtrs_msg_rdma_write
*msg
;
1088 struct rtrs_rbuf
*rbuf
;
1091 struct ib_reg_wr rwr
;
1092 struct ib_send_wr inv_wr
;
1093 struct ib_send_wr
*wr
= NULL
;
1096 const size_t tsize
= sizeof(*msg
) + req
->data_len
+ req
->usr_len
;
1098 if (tsize
> clt_path
->chunk_size
) {
1099 rtrs_wrn(s
, "Write request failed, size too big %zu > %d\n",
1100 tsize
, clt_path
->chunk_size
);
1104 count
= ib_dma_map_sg(clt_path
->s
.dev
->ib_dev
, req
->sglist
,
1105 req
->sg_cnt
, req
->dir
);
1107 rtrs_wrn(s
, "Write request failed, map failed\n");
1111 /* put rtrs msg after sg and user message */
1112 msg
= req
->iu
->buf
+ req
->usr_len
;
1113 msg
->type
= cpu_to_le16(RTRS_MSG_WRITE
);
1114 msg
->usr_len
= cpu_to_le16(req
->usr_len
);
1116 /* rtrs message on server side will be after user data and message */
1117 imm
= req
->permit
->mem_off
+ req
->data_len
+ req
->usr_len
;
1118 imm
= rtrs_to_io_req_imm(imm
);
1119 buf_id
= req
->permit
->mem_id
;
1120 req
->sg_size
= tsize
;
1121 rbuf
= &clt_path
->rbufs
[buf_id
];
1124 ret
= rtrs_map_sg_fr(req
, count
);
1127 "Write request failed, failed to map fast reg. data, err: %d\n",
1129 ib_dma_unmap_sg(clt_path
->s
.dev
->ib_dev
, req
->sglist
,
1130 req
->sg_cnt
, req
->dir
);
1133 inv_wr
= (struct ib_send_wr
) {
1134 .opcode
= IB_WR_LOCAL_INV
,
1135 .wr_cqe
= &req
->inv_cqe
,
1136 .send_flags
= IB_SEND_SIGNALED
,
1137 .ex
.invalidate_rkey
= req
->mr
->rkey
,
1139 req
->inv_cqe
.done
= rtrs_clt_inv_rkey_done
;
1140 rwr
= (struct ib_reg_wr
) {
1141 .wr
.opcode
= IB_WR_REG_MR
,
1142 .wr
.wr_cqe
= &fast_reg_cqe
,
1144 .key
= req
->mr
->rkey
,
1145 .access
= (IB_ACCESS_LOCAL_WRITE
),
1149 refcount_inc(&req
->ref
);
1152 * Update stats now, after request is successfully sent it is not
1153 * safe anymore to touch it.
1155 rtrs_clt_update_all_stats(req
, WRITE
);
1157 ret
= rtrs_post_rdma_write_sg(req
->con
, req
, rbuf
, fr_en
, count
,
1158 req
->usr_len
+ sizeof(*msg
),
1162 "Write request failed: error=%d path=%s [%s:%u]\n",
1163 ret
, kobject_name(&clt_path
->kobj
), clt_path
->hca_name
,
1164 clt_path
->hca_port
);
1165 if (req
->mp_policy
== MP_POLICY_MIN_INFLIGHT
)
1166 atomic_dec(&clt_path
->stats
->inflight
);
1168 ib_dma_unmap_sg(clt_path
->s
.dev
->ib_dev
, req
->sglist
,
1169 req
->sg_cnt
, req
->dir
);
1175 static int rtrs_clt_read_req(struct rtrs_clt_io_req
*req
)
1177 struct rtrs_clt_con
*con
= req
->con
;
1178 struct rtrs_path
*s
= con
->c
.path
;
1179 struct rtrs_clt_path
*clt_path
= to_clt_path(s
);
1180 struct rtrs_msg_rdma_read
*msg
;
1181 struct rtrs_ib_dev
*dev
= clt_path
->s
.dev
;
1183 struct ib_reg_wr rwr
;
1184 struct ib_send_wr
*wr
= NULL
;
1189 const size_t tsize
= sizeof(*msg
) + req
->data_len
+ req
->usr_len
;
1191 if (tsize
> clt_path
->chunk_size
) {
1193 "Read request failed, message size is %zu, bigger than CHUNK_SIZE %d\n",
1194 tsize
, clt_path
->chunk_size
);
1199 count
= ib_dma_map_sg(dev
->ib_dev
, req
->sglist
, req
->sg_cnt
,
1203 "Read request failed, dma map failed\n");
1207 /* put our message into req->buf after user message*/
1208 msg
= req
->iu
->buf
+ req
->usr_len
;
1209 msg
->type
= cpu_to_le16(RTRS_MSG_READ
);
1210 msg
->usr_len
= cpu_to_le16(req
->usr_len
);
1213 ret
= rtrs_map_sg_fr(req
, count
);
1216 "Read request failed, failed to map fast reg. data, err: %d\n",
1218 ib_dma_unmap_sg(dev
->ib_dev
, req
->sglist
, req
->sg_cnt
,
1222 rwr
= (struct ib_reg_wr
) {
1223 .wr
.opcode
= IB_WR_REG_MR
,
1224 .wr
.wr_cqe
= &fast_reg_cqe
,
1226 .key
= req
->mr
->rkey
,
1227 .access
= (IB_ACCESS_LOCAL_WRITE
|
1228 IB_ACCESS_REMOTE_WRITE
),
1232 msg
->sg_cnt
= cpu_to_le16(1);
1233 msg
->flags
= cpu_to_le16(RTRS_MSG_NEED_INVAL_F
);
1235 msg
->desc
[0].addr
= cpu_to_le64(req
->mr
->iova
);
1236 msg
->desc
[0].key
= cpu_to_le32(req
->mr
->rkey
);
1237 msg
->desc
[0].len
= cpu_to_le32(req
->mr
->length
);
1239 /* Further invalidation is required */
1240 req
->need_inv
= !!RTRS_MSG_NEED_INVAL_F
;
1247 * rtrs message will be after the space reserved for disk data and
1250 imm
= req
->permit
->mem_off
+ req
->data_len
+ req
->usr_len
;
1251 imm
= rtrs_to_io_req_imm(imm
);
1252 buf_id
= req
->permit
->mem_id
;
1254 req
->sg_size
= sizeof(*msg
);
1255 req
->sg_size
+= le16_to_cpu(msg
->sg_cnt
) * sizeof(struct rtrs_sg_desc
);
1256 req
->sg_size
+= req
->usr_len
;
1259 * Update stats now, after request is successfully sent it is not
1260 * safe anymore to touch it.
1262 rtrs_clt_update_all_stats(req
, READ
);
1264 ret
= rtrs_post_send_rdma(req
->con
, req
, &clt_path
->rbufs
[buf_id
],
1265 req
->data_len
, imm
, wr
);
1268 "Read request failed: error=%d path=%s [%s:%u]\n",
1269 ret
, kobject_name(&clt_path
->kobj
), clt_path
->hca_name
,
1270 clt_path
->hca_port
);
1271 if (req
->mp_policy
== MP_POLICY_MIN_INFLIGHT
)
1272 atomic_dec(&clt_path
->stats
->inflight
);
1273 req
->need_inv
= false;
1275 ib_dma_unmap_sg(dev
->ib_dev
, req
->sglist
,
1276 req
->sg_cnt
, req
->dir
);
1283 * rtrs_clt_failover_req() - Try to find an active path for a failed request
1285 * @fail_req: a failed io request.
1287 static int rtrs_clt_failover_req(struct rtrs_clt_sess
*clt
,
1288 struct rtrs_clt_io_req
*fail_req
)
1290 struct rtrs_clt_path
*alive_path
;
1291 struct rtrs_clt_io_req
*req
;
1292 int err
= -ECONNABORTED
;
1296 for (path_it_init(&it
, clt
);
1297 (alive_path
= it
.next_path(&it
)) && it
.i
< it
.clt
->paths_num
;
1299 if (READ_ONCE(alive_path
->state
) != RTRS_CLT_CONNECTED
)
1301 req
= rtrs_clt_get_copy_req(alive_path
, fail_req
);
1302 if (req
->dir
== DMA_TO_DEVICE
)
1303 err
= rtrs_clt_write_req(req
);
1305 err
= rtrs_clt_read_req(req
);
1307 req
->in_use
= false;
1311 rtrs_clt_inc_failover_cnt(alive_path
->stats
);
1314 path_it_deinit(&it
);
1320 static void fail_all_outstanding_reqs(struct rtrs_clt_path
*clt_path
)
1322 struct rtrs_clt_sess
*clt
= clt_path
->clt
;
1323 struct rtrs_clt_io_req
*req
;
1326 if (!clt_path
->reqs
)
1328 for (i
= 0; i
< clt_path
->queue_depth
; ++i
) {
1329 req
= &clt_path
->reqs
[i
];
1334 * Safely (without notification) complete failed request.
1335 * After completion this request is still useble and can
1336 * be failovered to another path.
1338 complete_rdma_req(req
, -ECONNABORTED
, false, true);
1340 err
= rtrs_clt_failover_req(clt
, req
);
1342 /* Failover failed, notify anyway */
1343 req
->conf(req
->priv
, err
);
1347 static void free_path_reqs(struct rtrs_clt_path
*clt_path
)
1349 struct rtrs_clt_io_req
*req
;
1352 if (!clt_path
->reqs
)
1354 for (i
= 0; i
< clt_path
->queue_depth
; ++i
) {
1355 req
= &clt_path
->reqs
[i
];
1357 ib_dereg_mr(req
->mr
);
1359 rtrs_iu_free(req
->iu
, clt_path
->s
.dev
->ib_dev
, 1);
1361 kfree(clt_path
->reqs
);
1362 clt_path
->reqs
= NULL
;
1365 static int alloc_path_reqs(struct rtrs_clt_path
*clt_path
)
1367 struct rtrs_clt_io_req
*req
;
1368 int i
, err
= -ENOMEM
;
1370 clt_path
->reqs
= kcalloc(clt_path
->queue_depth
,
1371 sizeof(*clt_path
->reqs
),
1373 if (!clt_path
->reqs
)
1376 for (i
= 0; i
< clt_path
->queue_depth
; ++i
) {
1377 req
= &clt_path
->reqs
[i
];
1378 req
->iu
= rtrs_iu_alloc(1, clt_path
->max_hdr_size
, GFP_KERNEL
,
1379 clt_path
->s
.dev
->ib_dev
,
1381 rtrs_clt_rdma_done
);
1385 req
->sge
= kcalloc(2, sizeof(*req
->sge
), GFP_KERNEL
);
1389 req
->mr
= ib_alloc_mr(clt_path
->s
.dev
->ib_pd
,
1391 clt_path
->max_pages_per_mr
);
1392 if (IS_ERR(req
->mr
)) {
1393 err
= PTR_ERR(req
->mr
);
1395 pr_err("Failed to alloc clt_path->max_pages_per_mr %d\n",
1396 clt_path
->max_pages_per_mr
);
1400 init_completion(&req
->inv_comp
);
1406 free_path_reqs(clt_path
);
1411 static int alloc_permits(struct rtrs_clt_sess
*clt
)
1413 unsigned int chunk_bits
;
1416 clt
->permits_map
= bitmap_zalloc(clt
->queue_depth
, GFP_KERNEL
);
1417 if (!clt
->permits_map
) {
1421 clt
->permits
= kcalloc(clt
->queue_depth
, permit_size(clt
), GFP_KERNEL
);
1422 if (!clt
->permits
) {
1426 chunk_bits
= ilog2(clt
->queue_depth
- 1) + 1;
1427 for (i
= 0; i
< clt
->queue_depth
; i
++) {
1428 struct rtrs_permit
*permit
;
1430 permit
= get_permit(clt
, i
);
1432 permit
->mem_off
= i
<< (MAX_IMM_PAYL_BITS
- chunk_bits
);
1438 bitmap_free(clt
->permits_map
);
1439 clt
->permits_map
= NULL
;
1444 static void free_permits(struct rtrs_clt_sess
*clt
)
1446 if (clt
->permits_map
)
1447 wait_event(clt
->permits_wait
,
1448 bitmap_empty(clt
->permits_map
, clt
->queue_depth
));
1450 bitmap_free(clt
->permits_map
);
1451 clt
->permits_map
= NULL
;
1452 kfree(clt
->permits
);
1453 clt
->permits
= NULL
;
1456 static void query_fast_reg_mode(struct rtrs_clt_path
*clt_path
)
1458 struct ib_device
*ib_dev
;
1459 u64 max_pages_per_mr
;
1462 ib_dev
= clt_path
->s
.dev
->ib_dev
;
1465 * Use the smallest page size supported by the HCA, down to a
1466 * minimum of 4096 bytes. We're unlikely to build large sglists
1467 * out of smaller entries.
1469 mr_page_shift
= max(12, ffs(ib_dev
->attrs
.page_size_cap
) - 1);
1470 max_pages_per_mr
= ib_dev
->attrs
.max_mr_size
;
1471 do_div(max_pages_per_mr
, (1ull << mr_page_shift
));
1472 clt_path
->max_pages_per_mr
=
1473 min3(clt_path
->max_pages_per_mr
, (u32
)max_pages_per_mr
,
1474 ib_dev
->attrs
.max_fast_reg_page_list_len
);
1475 clt_path
->clt
->max_segments
=
1476 min(clt_path
->max_pages_per_mr
, clt_path
->clt
->max_segments
);
1479 static bool rtrs_clt_change_state_get_old(struct rtrs_clt_path
*clt_path
,
1480 enum rtrs_clt_state new_state
,
1481 enum rtrs_clt_state
*old_state
)
1485 spin_lock_irq(&clt_path
->state_wq
.lock
);
1487 *old_state
= clt_path
->state
;
1488 changed
= rtrs_clt_change_state(clt_path
, new_state
);
1489 spin_unlock_irq(&clt_path
->state_wq
.lock
);
1494 static void rtrs_clt_hb_err_handler(struct rtrs_con
*c
)
1496 struct rtrs_clt_con
*con
= container_of(c
, typeof(*con
), c
);
1498 rtrs_rdma_error_recovery(con
);
1501 static void rtrs_clt_init_hb(struct rtrs_clt_path
*clt_path
)
1503 rtrs_init_hb(&clt_path
->s
, &io_comp_cqe
,
1504 RTRS_HB_INTERVAL_MS
,
1506 rtrs_clt_hb_err_handler
,
1510 static void rtrs_clt_reconnect_work(struct work_struct
*work
);
1511 static void rtrs_clt_close_work(struct work_struct
*work
);
1513 static void rtrs_clt_err_recovery_work(struct work_struct
*work
)
1515 struct rtrs_clt_path
*clt_path
;
1516 struct rtrs_clt_sess
*clt
;
1519 clt_path
= container_of(work
, struct rtrs_clt_path
, err_recovery_work
);
1520 clt
= clt_path
->clt
;
1521 delay_ms
= clt
->reconnect_delay_sec
* 1000;
1522 rtrs_clt_stop_and_destroy_conns(clt_path
);
1523 queue_delayed_work(rtrs_wq
, &clt_path
->reconnect_dwork
,
1524 msecs_to_jiffies(delay_ms
+
1525 get_random_u32_below(RTRS_RECONNECT_SEED
)));
1528 static struct rtrs_clt_path
*alloc_path(struct rtrs_clt_sess
*clt
,
1529 const struct rtrs_addr
*path
,
1530 size_t con_num
, u32 nr_poll_queues
)
1532 struct rtrs_clt_path
*clt_path
;
1537 clt_path
= kzalloc(sizeof(*clt_path
), GFP_KERNEL
);
1543 * +1: Extra connection for user messages
1545 total_con
= con_num
+ nr_poll_queues
+ 1;
1546 clt_path
->s
.con
= kcalloc(total_con
, sizeof(*clt_path
->s
.con
),
1548 if (!clt_path
->s
.con
)
1551 clt_path
->s
.con_num
= total_con
;
1552 clt_path
->s
.irq_con_num
= con_num
+ 1;
1554 clt_path
->stats
= kzalloc(sizeof(*clt_path
->stats
), GFP_KERNEL
);
1555 if (!clt_path
->stats
)
1558 mutex_init(&clt_path
->init_mutex
);
1559 uuid_gen(&clt_path
->s
.uuid
);
1560 memcpy(&clt_path
->s
.dst_addr
, path
->dst
,
1561 rdma_addr_size((struct sockaddr
*)path
->dst
));
1564 * rdma_resolve_addr() passes src_addr to cma_bind_addr, which
1565 * checks the sa_family to be non-zero. If user passed src_addr=NULL
1566 * the sess->src_addr will contain only zeros, which is then fine.
1569 memcpy(&clt_path
->s
.src_addr
, path
->src
,
1570 rdma_addr_size((struct sockaddr
*)path
->src
));
1571 strscpy(clt_path
->s
.sessname
, clt
->sessname
,
1572 sizeof(clt_path
->s
.sessname
));
1573 clt_path
->clt
= clt
;
1574 clt_path
->max_pages_per_mr
= RTRS_MAX_SEGMENTS
;
1575 init_waitqueue_head(&clt_path
->state_wq
);
1576 clt_path
->state
= RTRS_CLT_CONNECTING
;
1577 atomic_set(&clt_path
->connected_cnt
, 0);
1578 INIT_WORK(&clt_path
->close_work
, rtrs_clt_close_work
);
1579 INIT_WORK(&clt_path
->err_recovery_work
, rtrs_clt_err_recovery_work
);
1580 INIT_DELAYED_WORK(&clt_path
->reconnect_dwork
, rtrs_clt_reconnect_work
);
1581 rtrs_clt_init_hb(clt_path
);
1583 clt_path
->mp_skip_entry
= alloc_percpu(typeof(*clt_path
->mp_skip_entry
));
1584 if (!clt_path
->mp_skip_entry
)
1585 goto err_free_stats
;
1587 for_each_possible_cpu(cpu
)
1588 INIT_LIST_HEAD(per_cpu_ptr(clt_path
->mp_skip_entry
, cpu
));
1590 err
= rtrs_clt_init_stats(clt_path
->stats
);
1592 goto err_free_percpu
;
1597 free_percpu(clt_path
->mp_skip_entry
);
1599 kfree(clt_path
->stats
);
1601 kfree(clt_path
->s
.con
);
1605 return ERR_PTR(err
);
1608 void free_path(struct rtrs_clt_path
*clt_path
)
1610 free_percpu(clt_path
->mp_skip_entry
);
1611 mutex_destroy(&clt_path
->init_mutex
);
1612 kfree(clt_path
->s
.con
);
1613 kfree(clt_path
->rbufs
);
1617 static int create_con(struct rtrs_clt_path
*clt_path
, unsigned int cid
)
1619 struct rtrs_clt_con
*con
;
1621 con
= kzalloc(sizeof(*con
), GFP_KERNEL
);
1625 /* Map first two connections to the first CPU */
1626 con
->cpu
= (cid
? cid
- 1 : 0) % nr_cpu_ids
;
1628 con
->c
.path
= &clt_path
->s
;
1629 /* Align with srv, init as 1 */
1630 atomic_set(&con
->c
.wr_cnt
, 1);
1631 mutex_init(&con
->con_mutex
);
1633 clt_path
->s
.con
[cid
] = &con
->c
;
1638 static void destroy_con(struct rtrs_clt_con
*con
)
1640 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
1642 clt_path
->s
.con
[con
->c
.cid
] = NULL
;
1643 mutex_destroy(&con
->con_mutex
);
1647 static int create_con_cq_qp(struct rtrs_clt_con
*con
)
1649 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
1650 u32 max_send_wr
, max_recv_wr
, cq_num
, max_send_sge
, wr_limit
;
1652 struct rtrs_msg_rkey_rsp
*rsp
;
1654 lockdep_assert_held(&con
->con_mutex
);
1655 if (con
->c
.cid
== 0) {
1657 /* We must be the first here */
1658 if (WARN_ON(clt_path
->s
.dev
))
1662 * The whole session uses device from user connection.
1663 * Be careful not to close user connection before ib dev
1664 * is gracefully put.
1666 clt_path
->s
.dev
= rtrs_ib_dev_find_or_add(con
->c
.cm_id
->device
,
1668 if (!clt_path
->s
.dev
) {
1669 rtrs_wrn(clt_path
->clt
,
1670 "rtrs_ib_dev_find_get_or_add(): no memory\n");
1673 clt_path
->s
.dev_ref
= 1;
1674 query_fast_reg_mode(clt_path
);
1675 wr_limit
= clt_path
->s
.dev
->ib_dev
->attrs
.max_qp_wr
;
1677 * Two (request + registration) completion for send
1678 * Two for recv if always_invalidate is set on server
1680 * + 2 for drain and heartbeat
1681 * in case qp gets into error state.
1684 min_t(int, wr_limit
, SERVICE_CON_QUEUE_DEPTH
* 2 + 2);
1685 max_recv_wr
= max_send_wr
;
1688 * Here we assume that session members are correctly set.
1689 * This is always true if user connection (cid == 0) is
1690 * established first.
1692 if (WARN_ON(!clt_path
->s
.dev
))
1694 if (WARN_ON(!clt_path
->queue_depth
))
1697 wr_limit
= clt_path
->s
.dev
->ib_dev
->attrs
.max_qp_wr
;
1698 /* Shared between connections */
1699 clt_path
->s
.dev_ref
++;
1700 max_send_wr
= min_t(int, wr_limit
,
1701 /* QD * (REQ + RSP + FR REGS or INVS) + drain */
1702 clt_path
->queue_depth
* 4 + 1);
1703 max_recv_wr
= min_t(int, wr_limit
,
1704 clt_path
->queue_depth
* 3 + 1);
1707 atomic_set(&con
->c
.sq_wr_avail
, max_send_wr
);
1708 cq_num
= max_send_wr
+ max_recv_wr
;
1709 /* alloc iu to recv new rkey reply when server reports flags set */
1710 if (clt_path
->flags
& RTRS_MSG_NEW_RKEY_F
|| con
->c
.cid
== 0) {
1711 con
->rsp_ius
= rtrs_iu_alloc(cq_num
, sizeof(*rsp
),
1713 clt_path
->s
.dev
->ib_dev
,
1715 rtrs_clt_rdma_done
);
1718 con
->queue_num
= cq_num
;
1720 cq_vector
= con
->cpu
% clt_path
->s
.dev
->ib_dev
->num_comp_vectors
;
1721 if (con
->c
.cid
>= clt_path
->s
.irq_con_num
)
1722 err
= rtrs_cq_qp_create(&clt_path
->s
, &con
->c
, max_send_sge
,
1723 cq_vector
, cq_num
, max_send_wr
,
1724 max_recv_wr
, IB_POLL_DIRECT
);
1726 err
= rtrs_cq_qp_create(&clt_path
->s
, &con
->c
, max_send_sge
,
1727 cq_vector
, cq_num
, max_send_wr
,
1728 max_recv_wr
, IB_POLL_SOFTIRQ
);
1730 * In case of error we do not bother to clean previous allocations,
1731 * since destroy_con_cq_qp() must be called.
1736 static void destroy_con_cq_qp(struct rtrs_clt_con
*con
)
1738 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
1741 * Be careful here: destroy_con_cq_qp() can be called even
1742 * create_con_cq_qp() failed, see comments there.
1744 lockdep_assert_held(&con
->con_mutex
);
1745 rtrs_cq_qp_destroy(&con
->c
);
1747 rtrs_iu_free(con
->rsp_ius
, clt_path
->s
.dev
->ib_dev
,
1749 con
->rsp_ius
= NULL
;
1752 if (clt_path
->s
.dev_ref
&& !--clt_path
->s
.dev_ref
) {
1753 rtrs_ib_dev_put(clt_path
->s
.dev
);
1754 clt_path
->s
.dev
= NULL
;
1758 static void stop_cm(struct rtrs_clt_con
*con
)
1760 rdma_disconnect(con
->c
.cm_id
);
1762 ib_drain_qp(con
->c
.qp
);
1765 static void destroy_cm(struct rtrs_clt_con
*con
)
1767 rdma_destroy_id(con
->c
.cm_id
);
1768 con
->c
.cm_id
= NULL
;
1771 static int rtrs_rdma_addr_resolved(struct rtrs_clt_con
*con
)
1773 struct rtrs_path
*s
= con
->c
.path
;
1776 mutex_lock(&con
->con_mutex
);
1777 err
= create_con_cq_qp(con
);
1778 mutex_unlock(&con
->con_mutex
);
1780 rtrs_err(s
, "create_con_cq_qp(), err: %d\n", err
);
1783 err
= rdma_resolve_route(con
->c
.cm_id
, RTRS_CONNECT_TIMEOUT_MS
);
1785 rtrs_err(s
, "Resolving route failed, err: %d\n", err
);
1790 static int rtrs_rdma_route_resolved(struct rtrs_clt_con
*con
)
1792 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
1793 struct rtrs_clt_sess
*clt
= clt_path
->clt
;
1794 struct rtrs_msg_conn_req msg
;
1795 struct rdma_conn_param param
;
1799 param
= (struct rdma_conn_param
) {
1801 .rnr_retry_count
= 7,
1802 .private_data
= &msg
,
1803 .private_data_len
= sizeof(msg
),
1806 msg
= (struct rtrs_msg_conn_req
) {
1807 .magic
= cpu_to_le16(RTRS_MAGIC
),
1808 .version
= cpu_to_le16(RTRS_PROTO_VER
),
1809 .cid
= cpu_to_le16(con
->c
.cid
),
1810 .cid_num
= cpu_to_le16(clt_path
->s
.con_num
),
1811 .recon_cnt
= cpu_to_le16(clt_path
->s
.recon_cnt
),
1813 msg
.first_conn
= clt_path
->for_new_clt
? FIRST_CONN
: 0;
1814 uuid_copy(&msg
.sess_uuid
, &clt_path
->s
.uuid
);
1815 uuid_copy(&msg
.paths_uuid
, &clt
->paths_uuid
);
1817 err
= rdma_connect_locked(con
->c
.cm_id
, ¶m
);
1819 rtrs_err(clt
, "rdma_connect_locked(): %d\n", err
);
1824 static int rtrs_rdma_conn_established(struct rtrs_clt_con
*con
,
1825 struct rdma_cm_event
*ev
)
1827 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
1828 struct rtrs_clt_sess
*clt
= clt_path
->clt
;
1829 const struct rtrs_msg_conn_rsp
*msg
;
1830 u16 version
, queue_depth
;
1834 msg
= ev
->param
.conn
.private_data
;
1835 len
= ev
->param
.conn
.private_data_len
;
1836 if (len
< sizeof(*msg
)) {
1837 rtrs_err(clt
, "Invalid RTRS connection response\n");
1840 if (le16_to_cpu(msg
->magic
) != RTRS_MAGIC
) {
1841 rtrs_err(clt
, "Invalid RTRS magic\n");
1844 version
= le16_to_cpu(msg
->version
);
1845 if (version
>> 8 != RTRS_PROTO_VER_MAJOR
) {
1846 rtrs_err(clt
, "Unsupported major RTRS version: %d, expected %d\n",
1847 version
>> 8, RTRS_PROTO_VER_MAJOR
);
1850 errno
= le16_to_cpu(msg
->errno
);
1852 rtrs_err(clt
, "Invalid RTRS message: errno %d\n",
1856 if (con
->c
.cid
== 0) {
1857 queue_depth
= le16_to_cpu(msg
->queue_depth
);
1859 if (clt_path
->queue_depth
> 0 && queue_depth
!= clt_path
->queue_depth
) {
1860 rtrs_err(clt
, "Error: queue depth changed\n");
1863 * Stop any more reconnection attempts
1865 clt_path
->reconnect_attempts
= -1;
1867 "Disabling auto-reconnect. Trigger a manual reconnect after issue is resolved\n");
1871 if (!clt_path
->rbufs
) {
1872 clt_path
->rbufs
= kcalloc(queue_depth
,
1873 sizeof(*clt_path
->rbufs
),
1875 if (!clt_path
->rbufs
)
1878 clt_path
->queue_depth
= queue_depth
;
1879 clt_path
->s
.signal_interval
= min_not_zero(queue_depth
,
1880 (unsigned short) SERVICE_CON_QUEUE_DEPTH
);
1881 clt_path
->max_hdr_size
= le32_to_cpu(msg
->max_hdr_size
);
1882 clt_path
->max_io_size
= le32_to_cpu(msg
->max_io_size
);
1883 clt_path
->flags
= le32_to_cpu(msg
->flags
);
1884 clt_path
->chunk_size
= clt_path
->max_io_size
+ clt_path
->max_hdr_size
;
1887 * Global IO size is always a minimum.
1888 * If while a reconnection server sends us a value a bit
1889 * higher - client does not care and uses cached minimum.
1891 * Since we can have several sessions (paths) restablishing
1892 * connections in parallel, use lock.
1894 mutex_lock(&clt
->paths_mutex
);
1895 clt
->queue_depth
= clt_path
->queue_depth
;
1896 clt
->max_io_size
= min_not_zero(clt_path
->max_io_size
,
1898 mutex_unlock(&clt
->paths_mutex
);
1901 * Cache the hca_port and hca_name for sysfs
1903 clt_path
->hca_port
= con
->c
.cm_id
->port_num
;
1904 scnprintf(clt_path
->hca_name
, sizeof(clt_path
->hca_name
),
1905 clt_path
->s
.dev
->ib_dev
->name
);
1906 clt_path
->s
.src_addr
= con
->c
.cm_id
->route
.addr
.src_addr
;
1907 /* set for_new_clt, to allow future reconnect on any path */
1908 clt_path
->for_new_clt
= 1;
1914 static inline void flag_success_on_conn(struct rtrs_clt_con
*con
)
1916 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
1918 atomic_inc(&clt_path
->connected_cnt
);
1922 static int rtrs_rdma_conn_rejected(struct rtrs_clt_con
*con
,
1923 struct rdma_cm_event
*ev
)
1925 struct rtrs_path
*s
= con
->c
.path
;
1926 const struct rtrs_msg_conn_rsp
*msg
;
1927 const char *rej_msg
;
1931 status
= ev
->status
;
1932 rej_msg
= rdma_reject_msg(con
->c
.cm_id
, status
);
1933 msg
= rdma_consumer_reject_data(con
->c
.cm_id
, ev
, &data_len
);
1935 if (msg
&& data_len
>= sizeof(*msg
)) {
1936 errno
= (int16_t)le16_to_cpu(msg
->errno
);
1937 if (errno
== -EBUSY
)
1939 "Previous session is still exists on the server, please reconnect later\n");
1942 "Connect rejected: status %d (%s), rtrs errno %d\n",
1943 status
, rej_msg
, errno
);
1946 "Connect rejected but with malformed message: status %d (%s)\n",
1953 void rtrs_clt_close_conns(struct rtrs_clt_path
*clt_path
, bool wait
)
1955 trace_rtrs_clt_close_conns(clt_path
);
1957 if (rtrs_clt_change_state_get_old(clt_path
, RTRS_CLT_CLOSING
, NULL
))
1958 queue_work(rtrs_wq
, &clt_path
->close_work
);
1960 flush_work(&clt_path
->close_work
);
1963 static inline void flag_error_on_conn(struct rtrs_clt_con
*con
, int cm_err
)
1965 if (con
->cm_err
== 1) {
1966 struct rtrs_clt_path
*clt_path
;
1968 clt_path
= to_clt_path(con
->c
.path
);
1969 if (atomic_dec_and_test(&clt_path
->connected_cnt
))
1971 wake_up(&clt_path
->state_wq
);
1973 con
->cm_err
= cm_err
;
1976 static int rtrs_clt_rdma_cm_handler(struct rdma_cm_id
*cm_id
,
1977 struct rdma_cm_event
*ev
)
1979 struct rtrs_clt_con
*con
= cm_id
->context
;
1980 struct rtrs_path
*s
= con
->c
.path
;
1981 struct rtrs_clt_path
*clt_path
= to_clt_path(s
);
1984 switch (ev
->event
) {
1985 case RDMA_CM_EVENT_ADDR_RESOLVED
:
1986 cm_err
= rtrs_rdma_addr_resolved(con
);
1988 case RDMA_CM_EVENT_ROUTE_RESOLVED
:
1989 cm_err
= rtrs_rdma_route_resolved(con
);
1991 case RDMA_CM_EVENT_ESTABLISHED
:
1992 cm_err
= rtrs_rdma_conn_established(con
, ev
);
1995 * Report success and wake up. Here we abuse state_wq,
1996 * i.e. wake up without state change, but we set cm_err.
1998 flag_success_on_conn(con
);
1999 wake_up(&clt_path
->state_wq
);
2003 case RDMA_CM_EVENT_REJECTED
:
2004 cm_err
= rtrs_rdma_conn_rejected(con
, ev
);
2006 case RDMA_CM_EVENT_DISCONNECTED
:
2007 /* No message for disconnecting */
2008 cm_err
= -ECONNRESET
;
2010 case RDMA_CM_EVENT_CONNECT_ERROR
:
2011 case RDMA_CM_EVENT_UNREACHABLE
:
2012 case RDMA_CM_EVENT_ADDR_CHANGE
:
2013 case RDMA_CM_EVENT_TIMEWAIT_EXIT
:
2014 rtrs_wrn(s
, "CM error (CM event: %s, err: %d)\n",
2015 rdma_event_msg(ev
->event
), ev
->status
);
2016 cm_err
= -ECONNRESET
;
2018 case RDMA_CM_EVENT_ADDR_ERROR
:
2019 case RDMA_CM_EVENT_ROUTE_ERROR
:
2020 rtrs_wrn(s
, "CM error (CM event: %s, err: %d)\n",
2021 rdma_event_msg(ev
->event
), ev
->status
);
2022 cm_err
= -EHOSTUNREACH
;
2024 case RDMA_CM_EVENT_DEVICE_REMOVAL
:
2026 * Device removal is a special case. Queue close and return 0.
2028 rtrs_clt_close_conns(clt_path
, false);
2031 rtrs_err(s
, "Unexpected RDMA CM error (CM event: %s, err: %d)\n",
2032 rdma_event_msg(ev
->event
), ev
->status
);
2033 cm_err
= -ECONNRESET
;
2039 * cm error makes sense only on connection establishing,
2040 * in other cases we rely on normal procedure of reconnecting.
2042 flag_error_on_conn(con
, cm_err
);
2043 rtrs_rdma_error_recovery(con
);
2049 /* The caller should do the cleanup in case of error */
2050 static int create_cm(struct rtrs_clt_con
*con
)
2052 struct rtrs_path
*s
= con
->c
.path
;
2053 struct rtrs_clt_path
*clt_path
= to_clt_path(s
);
2054 struct rdma_cm_id
*cm_id
;
2057 cm_id
= rdma_create_id(&init_net
, rtrs_clt_rdma_cm_handler
, con
,
2058 clt_path
->s
.dst_addr
.ss_family
== AF_IB
?
2059 RDMA_PS_IB
: RDMA_PS_TCP
, IB_QPT_RC
);
2060 if (IS_ERR(cm_id
)) {
2061 err
= PTR_ERR(cm_id
);
2062 rtrs_err(s
, "Failed to create CM ID, err: %d\n", err
);
2066 con
->c
.cm_id
= cm_id
;
2068 /* allow the port to be reused */
2069 err
= rdma_set_reuseaddr(cm_id
, 1);
2071 rtrs_err(s
, "Set address reuse failed, err: %d\n", err
);
2074 err
= rdma_resolve_addr(cm_id
, (struct sockaddr
*)&clt_path
->s
.src_addr
,
2075 (struct sockaddr
*)&clt_path
->s
.dst_addr
,
2076 RTRS_CONNECT_TIMEOUT_MS
);
2078 rtrs_err(s
, "Failed to resolve address, err: %d\n", err
);
2082 * Combine connection status and session events. This is needed
2083 * for waiting two possible cases: cm_err has something meaningful
2084 * or session state was really changed to error by device removal.
2086 err
= wait_event_interruptible_timeout(
2088 con
->cm_err
|| clt_path
->state
!= RTRS_CLT_CONNECTING
,
2089 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS
));
2090 if (err
== 0 || err
== -ERESTARTSYS
) {
2093 /* Timedout or interrupted */
2096 if (con
->cm_err
< 0)
2098 if (READ_ONCE(clt_path
->state
) != RTRS_CLT_CONNECTING
)
2099 /* Device removal */
2100 return -ECONNABORTED
;
2105 static void rtrs_clt_path_up(struct rtrs_clt_path
*clt_path
)
2107 struct rtrs_clt_sess
*clt
= clt_path
->clt
;
2111 * We can fire RECONNECTED event only when all paths were
2112 * connected on rtrs_clt_open(), then each was disconnected
2113 * and the first one connected again. That's why this nasty
2114 * game with counter value.
2117 mutex_lock(&clt
->paths_ev_mutex
);
2118 up
= ++clt
->paths_up
;
2120 * Here it is safe to access paths num directly since up counter
2121 * is greater than MAX_PATHS_NUM only while rtrs_clt_open() is
2122 * in progress, thus paths removals are impossible.
2124 if (up
> MAX_PATHS_NUM
&& up
== MAX_PATHS_NUM
+ clt
->paths_num
)
2125 clt
->paths_up
= clt
->paths_num
;
2127 clt
->link_ev(clt
->priv
, RTRS_CLT_LINK_EV_RECONNECTED
);
2128 mutex_unlock(&clt
->paths_ev_mutex
);
2130 /* Mark session as established */
2131 clt_path
->established
= true;
2132 clt_path
->reconnect_attempts
= 0;
2133 clt_path
->stats
->reconnects
.successful_cnt
++;
2136 static void rtrs_clt_path_down(struct rtrs_clt_path
*clt_path
)
2138 struct rtrs_clt_sess
*clt
= clt_path
->clt
;
2140 if (!clt_path
->established
)
2143 clt_path
->established
= false;
2144 mutex_lock(&clt
->paths_ev_mutex
);
2145 WARN_ON(!clt
->paths_up
);
2146 if (--clt
->paths_up
== 0)
2147 clt
->link_ev(clt
->priv
, RTRS_CLT_LINK_EV_DISCONNECTED
);
2148 mutex_unlock(&clt
->paths_ev_mutex
);
2151 static void rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_path
*clt_path
)
2153 struct rtrs_clt_con
*con
;
2156 WARN_ON(READ_ONCE(clt_path
->state
) == RTRS_CLT_CONNECTED
);
2159 * Possible race with rtrs_clt_open(), when DEVICE_REMOVAL comes
2160 * exactly in between. Start destroying after it finishes.
2162 mutex_lock(&clt_path
->init_mutex
);
2163 mutex_unlock(&clt_path
->init_mutex
);
2166 * All IO paths must observe !CONNECTED state before we
2171 rtrs_stop_hb(&clt_path
->s
);
2174 * The order it utterly crucial: firstly disconnect and complete all
2175 * rdma requests with error (thus set in_use=false for requests),
2176 * then fail outstanding requests checking in_use for each, and
2177 * eventually notify upper layer about session disconnection.
2180 for (cid
= 0; cid
< clt_path
->s
.con_num
; cid
++) {
2181 if (!clt_path
->s
.con
[cid
])
2183 con
= to_clt_con(clt_path
->s
.con
[cid
]);
2186 fail_all_outstanding_reqs(clt_path
);
2187 free_path_reqs(clt_path
);
2188 rtrs_clt_path_down(clt_path
);
2191 * Wait for graceful shutdown, namely when peer side invokes
2192 * rdma_disconnect(). 'connected_cnt' is decremented only on
2193 * CM events, thus if other side had crashed and hb has detected
2194 * something is wrong, here we will stuck for exactly timeout ms,
2195 * since CM does not fire anything. That is fine, we are not in
2198 wait_event_timeout(clt_path
->state_wq
,
2199 !atomic_read(&clt_path
->connected_cnt
),
2200 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS
));
2202 for (cid
= 0; cid
< clt_path
->s
.con_num
; cid
++) {
2203 if (!clt_path
->s
.con
[cid
])
2205 con
= to_clt_con(clt_path
->s
.con
[cid
]);
2206 mutex_lock(&con
->con_mutex
);
2207 destroy_con_cq_qp(con
);
2208 mutex_unlock(&con
->con_mutex
);
2214 static void rtrs_clt_remove_path_from_arr(struct rtrs_clt_path
*clt_path
)
2216 struct rtrs_clt_sess
*clt
= clt_path
->clt
;
2217 struct rtrs_clt_path
*next
;
2218 bool wait_for_grace
= false;
2221 mutex_lock(&clt
->paths_mutex
);
2222 list_del_rcu(&clt_path
->s
.entry
);
2224 /* Make sure everybody observes path removal. */
2228 * At this point nobody sees @sess in the list, but still we have
2229 * dangling pointer @pcpu_path which _can_ point to @sess. Since
2230 * nobody can observe @sess in the list, we guarantee that IO path
2231 * will not assign @sess to @pcpu_path, i.e. @pcpu_path can be equal
2232 * to @sess, but can never again become @sess.
2236 * Decrement paths number only after grace period, because
2237 * caller of do_each_path() must firstly observe list without
2238 * path and only then decremented paths number.
2240 * Otherwise there can be the following situation:
2241 * o Two paths exist and IO is coming.
2242 * o One path is removed:
2244 * do_each_path(): rtrs_clt_remove_path_from_arr():
2245 * path = get_next_path()
2246 * ^^^ list_del_rcu(path)
2247 * [!CONNECTED path] clt->paths_num--
2249 * load clt->paths_num from 2 to 1
2253 * path is observed as !CONNECTED, but do_each_path() loop
2254 * ends, because expression i < clt->paths_num is false.
2259 * Get @next connection from current @sess which is going to be
2260 * removed. If @sess is the last element, then @next is NULL.
2263 next
= rtrs_clt_get_next_path_or_null(&clt
->paths_list
, clt_path
);
2267 * @pcpu paths can still point to the path which is going to be
2268 * removed, so change the pointer manually.
2270 for_each_possible_cpu(cpu
) {
2271 struct rtrs_clt_path __rcu
**ppcpu_path
;
2273 ppcpu_path
= per_cpu_ptr(clt
->pcpu_path
, cpu
);
2274 if (rcu_dereference_protected(*ppcpu_path
,
2275 lockdep_is_held(&clt
->paths_mutex
)) != clt_path
)
2277 * synchronize_rcu() was called just after deleting
2278 * entry from the list, thus IO code path cannot
2279 * change pointer back to the pointer which is going
2280 * to be removed, we are safe here.
2285 * We race with IO code path, which also changes pointer,
2286 * thus we have to be careful not to overwrite it.
2288 if (try_cmpxchg((struct rtrs_clt_path
**)ppcpu_path
, &clt_path
,
2291 * @ppcpu_path was successfully replaced with @next,
2292 * that means that someone could also pick up the
2293 * @sess and dereferencing it right now, so wait for
2294 * a grace period is required.
2296 wait_for_grace
= true;
2301 mutex_unlock(&clt
->paths_mutex
);
2304 static void rtrs_clt_add_path_to_arr(struct rtrs_clt_path
*clt_path
)
2306 struct rtrs_clt_sess
*clt
= clt_path
->clt
;
2308 mutex_lock(&clt
->paths_mutex
);
2311 list_add_tail_rcu(&clt_path
->s
.entry
, &clt
->paths_list
);
2312 mutex_unlock(&clt
->paths_mutex
);
2315 static void rtrs_clt_close_work(struct work_struct
*work
)
2317 struct rtrs_clt_path
*clt_path
;
2319 clt_path
= container_of(work
, struct rtrs_clt_path
, close_work
);
2321 cancel_work_sync(&clt_path
->err_recovery_work
);
2322 cancel_delayed_work_sync(&clt_path
->reconnect_dwork
);
2323 rtrs_clt_stop_and_destroy_conns(clt_path
);
2324 rtrs_clt_change_state_get_old(clt_path
, RTRS_CLT_CLOSED
, NULL
);
2327 static int init_conns(struct rtrs_clt_path
*clt_path
)
2333 * On every new session connections increase reconnect counter
2334 * to avoid clashes with previous sessions not yet closed
2335 * sessions on a server side.
2337 clt_path
->s
.recon_cnt
++;
2339 /* Establish all RDMA connections */
2340 for (cid
= 0; cid
< clt_path
->s
.con_num
; cid
++) {
2341 err
= create_con(clt_path
, cid
);
2345 err
= create_cm(to_clt_con(clt_path
->s
.con
[cid
]));
2349 err
= alloc_path_reqs(clt_path
);
2356 /* Make sure we do the cleanup in the order they are created */
2357 for (i
= 0; i
<= cid
; i
++) {
2358 struct rtrs_clt_con
*con
;
2360 if (!clt_path
->s
.con
[i
])
2363 con
= to_clt_con(clt_path
->s
.con
[i
]);
2366 mutex_lock(&con
->con_mutex
);
2367 destroy_con_cq_qp(con
);
2368 mutex_unlock(&con
->con_mutex
);
2374 * If we've never taken async path and got an error, say,
2375 * doing rdma_resolve_addr(), switch to CONNECTION_ERR state
2376 * manually to keep reconnecting.
2378 rtrs_clt_change_state_get_old(clt_path
, RTRS_CLT_CONNECTING_ERR
, NULL
);
2383 static void rtrs_clt_info_req_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
2385 struct rtrs_clt_con
*con
= to_clt_con(wc
->qp
->qp_context
);
2386 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
2389 iu
= container_of(wc
->wr_cqe
, struct rtrs_iu
, cqe
);
2390 rtrs_iu_free(iu
, clt_path
->s
.dev
->ib_dev
, 1);
2392 if (wc
->status
!= IB_WC_SUCCESS
) {
2393 rtrs_err(clt_path
->clt
, "Path info request send failed: %s\n",
2394 ib_wc_status_msg(wc
->status
));
2395 rtrs_clt_change_state_get_old(clt_path
, RTRS_CLT_CONNECTING_ERR
, NULL
);
2399 rtrs_clt_update_wc_stats(con
);
2402 static int process_info_rsp(struct rtrs_clt_path
*clt_path
,
2403 const struct rtrs_msg_info_rsp
*msg
)
2405 unsigned int sg_cnt
, total_len
;
2408 sg_cnt
= le16_to_cpu(msg
->sg_cnt
);
2409 if (!sg_cnt
|| (clt_path
->queue_depth
% sg_cnt
)) {
2410 rtrs_err(clt_path
->clt
,
2411 "Incorrect sg_cnt %d, is not multiple\n",
2417 * Check if IB immediate data size is enough to hold the mem_id and
2418 * the offset inside the memory chunk.
2420 if ((ilog2(sg_cnt
- 1) + 1) + (ilog2(clt_path
->chunk_size
- 1) + 1) >
2421 MAX_IMM_PAYL_BITS
) {
2422 rtrs_err(clt_path
->clt
,
2423 "RDMA immediate size (%db) not enough to encode %d buffers of size %dB\n",
2424 MAX_IMM_PAYL_BITS
, sg_cnt
, clt_path
->chunk_size
);
2428 for (sgi
= 0, i
= 0; sgi
< sg_cnt
&& i
< clt_path
->queue_depth
; sgi
++) {
2429 const struct rtrs_sg_desc
*desc
= &msg
->desc
[sgi
];
2433 addr
= le64_to_cpu(desc
->addr
);
2434 rkey
= le32_to_cpu(desc
->key
);
2435 len
= le32_to_cpu(desc
->len
);
2439 if (!len
|| (len
% clt_path
->chunk_size
)) {
2440 rtrs_err(clt_path
->clt
, "Incorrect [%d].len %d\n",
2445 for ( ; len
&& i
< clt_path
->queue_depth
; i
++) {
2446 clt_path
->rbufs
[i
].addr
= addr
;
2447 clt_path
->rbufs
[i
].rkey
= rkey
;
2449 len
-= clt_path
->chunk_size
;
2450 addr
+= clt_path
->chunk_size
;
2454 if (sgi
!= sg_cnt
|| i
!= clt_path
->queue_depth
) {
2455 rtrs_err(clt_path
->clt
,
2456 "Incorrect sg vector, not fully mapped\n");
2459 if (total_len
!= clt_path
->chunk_size
* clt_path
->queue_depth
) {
2460 rtrs_err(clt_path
->clt
, "Incorrect total_len %d\n", total_len
);
2467 static void rtrs_clt_info_rsp_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
2469 struct rtrs_clt_con
*con
= to_clt_con(wc
->qp
->qp_context
);
2470 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
2471 struct rtrs_msg_info_rsp
*msg
;
2472 enum rtrs_clt_state state
;
2477 state
= RTRS_CLT_CONNECTING_ERR
;
2479 WARN_ON(con
->c
.cid
);
2480 iu
= container_of(wc
->wr_cqe
, struct rtrs_iu
, cqe
);
2481 if (wc
->status
!= IB_WC_SUCCESS
) {
2482 rtrs_err(clt_path
->clt
, "Path info response recv failed: %s\n",
2483 ib_wc_status_msg(wc
->status
));
2486 WARN_ON(wc
->opcode
!= IB_WC_RECV
);
2488 if (wc
->byte_len
< sizeof(*msg
)) {
2489 rtrs_err(clt_path
->clt
, "Path info response is malformed: size %d\n",
2493 ib_dma_sync_single_for_cpu(clt_path
->s
.dev
->ib_dev
, iu
->dma_addr
,
2494 iu
->size
, DMA_FROM_DEVICE
);
2496 if (le16_to_cpu(msg
->type
) != RTRS_MSG_INFO_RSP
) {
2497 rtrs_err(clt_path
->clt
, "Path info response is malformed: type %d\n",
2498 le16_to_cpu(msg
->type
));
2501 rx_sz
= sizeof(*msg
);
2502 rx_sz
+= sizeof(msg
->desc
[0]) * le16_to_cpu(msg
->sg_cnt
);
2503 if (wc
->byte_len
< rx_sz
) {
2504 rtrs_err(clt_path
->clt
, "Path info response is malformed: size %d\n",
2508 err
= process_info_rsp(clt_path
, msg
);
2512 err
= post_recv_path(clt_path
);
2516 state
= RTRS_CLT_CONNECTED
;
2519 rtrs_clt_update_wc_stats(con
);
2520 rtrs_iu_free(iu
, clt_path
->s
.dev
->ib_dev
, 1);
2521 rtrs_clt_change_state_get_old(clt_path
, state
, NULL
);
2524 static int rtrs_send_path_info(struct rtrs_clt_path
*clt_path
)
2526 struct rtrs_clt_con
*usr_con
= to_clt_con(clt_path
->s
.con
[0]);
2527 struct rtrs_msg_info_req
*msg
;
2528 struct rtrs_iu
*tx_iu
, *rx_iu
;
2532 rx_sz
= sizeof(struct rtrs_msg_info_rsp
);
2533 rx_sz
+= sizeof(struct rtrs_sg_desc
) * clt_path
->queue_depth
;
2535 tx_iu
= rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req
), GFP_KERNEL
,
2536 clt_path
->s
.dev
->ib_dev
, DMA_TO_DEVICE
,
2537 rtrs_clt_info_req_done
);
2538 rx_iu
= rtrs_iu_alloc(1, rx_sz
, GFP_KERNEL
, clt_path
->s
.dev
->ib_dev
,
2539 DMA_FROM_DEVICE
, rtrs_clt_info_rsp_done
);
2540 if (!tx_iu
|| !rx_iu
) {
2544 /* Prepare for getting info response */
2545 err
= rtrs_iu_post_recv(&usr_con
->c
, rx_iu
);
2547 rtrs_err(clt_path
->clt
, "rtrs_iu_post_recv(), err: %d\n", err
);
2553 msg
->type
= cpu_to_le16(RTRS_MSG_INFO_REQ
);
2554 memcpy(msg
->pathname
, clt_path
->s
.sessname
, sizeof(msg
->pathname
));
2556 ib_dma_sync_single_for_device(clt_path
->s
.dev
->ib_dev
,
2558 tx_iu
->size
, DMA_TO_DEVICE
);
2560 /* Send info request */
2561 err
= rtrs_iu_post_send(&usr_con
->c
, tx_iu
, sizeof(*msg
), NULL
);
2563 rtrs_err(clt_path
->clt
, "rtrs_iu_post_send(), err: %d\n", err
);
2568 /* Wait for state change */
2569 wait_event_interruptible_timeout(clt_path
->state_wq
,
2570 clt_path
->state
!= RTRS_CLT_CONNECTING
,
2572 RTRS_CONNECT_TIMEOUT_MS
));
2573 if (READ_ONCE(clt_path
->state
) != RTRS_CLT_CONNECTED
) {
2574 if (READ_ONCE(clt_path
->state
) == RTRS_CLT_CONNECTING_ERR
)
2582 rtrs_iu_free(tx_iu
, clt_path
->s
.dev
->ib_dev
, 1);
2584 rtrs_iu_free(rx_iu
, clt_path
->s
.dev
->ib_dev
, 1);
2586 /* If we've never taken async path because of malloc problems */
2587 rtrs_clt_change_state_get_old(clt_path
,
2588 RTRS_CLT_CONNECTING_ERR
, NULL
);
2594 * init_path() - establishes all path connections and does handshake
2595 * @clt_path: client path.
2596 * In case of error full close or reconnect procedure should be taken,
2597 * because reconnect or close async works can be started.
2599 static int init_path(struct rtrs_clt_path
*clt_path
)
2603 struct rtrs_addr path
= {
2604 .src
= &clt_path
->s
.src_addr
,
2605 .dst
= &clt_path
->s
.dst_addr
,
2608 rtrs_addr_to_str(&path
, str
, sizeof(str
));
2610 mutex_lock(&clt_path
->init_mutex
);
2611 err
= init_conns(clt_path
);
2613 rtrs_err(clt_path
->clt
,
2614 "init_conns() failed: err=%d path=%s [%s:%u]\n", err
,
2615 str
, clt_path
->hca_name
, clt_path
->hca_port
);
2618 err
= rtrs_send_path_info(clt_path
);
2620 rtrs_err(clt_path
->clt
,
2621 "rtrs_send_path_info() failed: err=%d path=%s [%s:%u]\n",
2622 err
, str
, clt_path
->hca_name
, clt_path
->hca_port
);
2625 rtrs_clt_path_up(clt_path
);
2626 rtrs_start_hb(&clt_path
->s
);
2628 mutex_unlock(&clt_path
->init_mutex
);
2633 static void rtrs_clt_reconnect_work(struct work_struct
*work
)
2635 struct rtrs_clt_path
*clt_path
;
2636 struct rtrs_clt_sess
*clt
;
2639 clt_path
= container_of(to_delayed_work(work
), struct rtrs_clt_path
,
2641 clt
= clt_path
->clt
;
2643 trace_rtrs_clt_reconnect_work(clt_path
);
2645 if (READ_ONCE(clt_path
->state
) != RTRS_CLT_RECONNECTING
)
2648 if (clt_path
->reconnect_attempts
>= clt
->max_reconnect_attempts
) {
2649 /* Close a path completely if max attempts is reached */
2650 rtrs_clt_close_conns(clt_path
, false);
2653 clt_path
->reconnect_attempts
++;
2655 msleep(RTRS_RECONNECT_BACKOFF
);
2656 if (rtrs_clt_change_state_get_old(clt_path
, RTRS_CLT_CONNECTING
, NULL
)) {
2657 err
= init_path(clt_path
);
2659 goto reconnect_again
;
2665 if (rtrs_clt_change_state_get_old(clt_path
, RTRS_CLT_RECONNECTING
, NULL
)) {
2666 clt_path
->stats
->reconnects
.fail_cnt
++;
2667 queue_work(rtrs_wq
, &clt_path
->err_recovery_work
);
2671 static void rtrs_clt_dev_release(struct device
*dev
)
2673 struct rtrs_clt_sess
*clt
= container_of(dev
, struct rtrs_clt_sess
,
2676 mutex_destroy(&clt
->paths_ev_mutex
);
2677 mutex_destroy(&clt
->paths_mutex
);
2681 static struct rtrs_clt_sess
*alloc_clt(const char *sessname
, size_t paths_num
,
2682 u16 port
, size_t pdu_sz
, void *priv
,
2683 void (*link_ev
)(void *priv
,
2684 enum rtrs_clt_link_ev ev
),
2685 unsigned int reconnect_delay_sec
,
2686 unsigned int max_reconnect_attempts
)
2688 struct rtrs_clt_sess
*clt
;
2691 if (!paths_num
|| paths_num
> MAX_PATHS_NUM
)
2692 return ERR_PTR(-EINVAL
);
2694 if (strlen(sessname
) >= sizeof(clt
->sessname
))
2695 return ERR_PTR(-EINVAL
);
2697 clt
= kzalloc(sizeof(*clt
), GFP_KERNEL
);
2699 return ERR_PTR(-ENOMEM
);
2701 clt
->pcpu_path
= alloc_percpu(typeof(*clt
->pcpu_path
));
2702 if (!clt
->pcpu_path
) {
2704 return ERR_PTR(-ENOMEM
);
2707 clt
->dev
.class = &rtrs_clt_dev_class
;
2708 clt
->dev
.release
= rtrs_clt_dev_release
;
2709 uuid_gen(&clt
->paths_uuid
);
2710 INIT_LIST_HEAD_RCU(&clt
->paths_list
);
2711 clt
->paths_num
= paths_num
;
2712 clt
->paths_up
= MAX_PATHS_NUM
;
2714 clt
->pdu_sz
= pdu_sz
;
2715 clt
->max_segments
= RTRS_MAX_SEGMENTS
;
2716 clt
->reconnect_delay_sec
= reconnect_delay_sec
;
2717 clt
->max_reconnect_attempts
= max_reconnect_attempts
;
2719 clt
->link_ev
= link_ev
;
2720 clt
->mp_policy
= MP_POLICY_MIN_INFLIGHT
;
2721 strscpy(clt
->sessname
, sessname
, sizeof(clt
->sessname
));
2722 init_waitqueue_head(&clt
->permits_wait
);
2723 mutex_init(&clt
->paths_ev_mutex
);
2724 mutex_init(&clt
->paths_mutex
);
2725 device_initialize(&clt
->dev
);
2727 err
= dev_set_name(&clt
->dev
, "%s", sessname
);
2732 * Suppress user space notification until
2733 * sysfs files are created
2735 dev_set_uevent_suppress(&clt
->dev
, true);
2736 err
= device_add(&clt
->dev
);
2740 clt
->kobj_paths
= kobject_create_and_add("paths", &clt
->dev
.kobj
);
2741 if (!clt
->kobj_paths
) {
2745 err
= rtrs_clt_create_sysfs_root_files(clt
);
2747 kobject_del(clt
->kobj_paths
);
2748 kobject_put(clt
->kobj_paths
);
2751 dev_set_uevent_suppress(&clt
->dev
, false);
2752 kobject_uevent(&clt
->dev
.kobj
, KOBJ_ADD
);
2756 device_del(&clt
->dev
);
2758 free_percpu(clt
->pcpu_path
);
2759 put_device(&clt
->dev
);
2760 return ERR_PTR(err
);
2763 static void free_clt(struct rtrs_clt_sess
*clt
)
2765 free_percpu(clt
->pcpu_path
);
2768 * release callback will free clt and destroy mutexes in last put
2770 device_unregister(&clt
->dev
);
2774 * rtrs_clt_open() - Open a path to an RTRS server
2775 * @ops: holds the link event callback and the private pointer.
2776 * @pathname: name of the path to an RTRS server
2777 * @paths: Paths to be established defined by their src and dst addresses
2778 * @paths_num: Number of elements in the @paths array
2779 * @port: port to be used by the RTRS session
2780 * @pdu_sz: Size of extra payload which can be accessed after permit allocation.
2781 * @reconnect_delay_sec: time between reconnect tries
2782 * @max_reconnect_attempts: Number of times to reconnect on error before giving
2783 * up, 0 for * disabled, -1 for forever
2784 * @nr_poll_queues: number of polling mode connection using IB_POLL_DIRECT flag
2786 * Starts session establishment with the rtrs_server. The function can block
2787 * up to ~2000ms before it returns.
2789 * Return a valid pointer on success otherwise PTR_ERR.
2791 struct rtrs_clt_sess
*rtrs_clt_open(struct rtrs_clt_ops
*ops
,
2792 const char *pathname
,
2793 const struct rtrs_addr
*paths
,
2794 size_t paths_num
, u16 port
,
2795 size_t pdu_sz
, u8 reconnect_delay_sec
,
2796 s16 max_reconnect_attempts
, u32 nr_poll_queues
)
2798 struct rtrs_clt_path
*clt_path
, *tmp
;
2799 struct rtrs_clt_sess
*clt
;
2802 if (strchr(pathname
, '/') || strchr(pathname
, '.')) {
2803 pr_err("pathname cannot contain / and .\n");
2808 clt
= alloc_clt(pathname
, paths_num
, port
, pdu_sz
, ops
->priv
,
2810 reconnect_delay_sec
,
2811 max_reconnect_attempts
);
2816 for (i
= 0; i
< paths_num
; i
++) {
2817 struct rtrs_clt_path
*clt_path
;
2819 clt_path
= alloc_path(clt
, &paths
[i
], nr_cpu_ids
,
2821 if (IS_ERR(clt_path
)) {
2822 err
= PTR_ERR(clt_path
);
2823 goto close_all_path
;
2826 clt_path
->for_new_clt
= 1;
2827 list_add_tail_rcu(&clt_path
->s
.entry
, &clt
->paths_list
);
2829 err
= init_path(clt_path
);
2831 list_del_rcu(&clt_path
->s
.entry
);
2832 rtrs_clt_close_conns(clt_path
, true);
2833 free_percpu(clt_path
->stats
->pcpu_stats
);
2834 kfree(clt_path
->stats
);
2835 free_path(clt_path
);
2836 goto close_all_path
;
2839 err
= rtrs_clt_create_path_files(clt_path
);
2841 list_del_rcu(&clt_path
->s
.entry
);
2842 rtrs_clt_close_conns(clt_path
, true);
2843 free_percpu(clt_path
->stats
->pcpu_stats
);
2844 kfree(clt_path
->stats
);
2845 free_path(clt_path
);
2846 goto close_all_path
;
2849 err
= alloc_permits(clt
);
2851 goto close_all_path
;
2856 list_for_each_entry_safe(clt_path
, tmp
, &clt
->paths_list
, s
.entry
) {
2857 rtrs_clt_destroy_path_files(clt_path
, NULL
);
2858 rtrs_clt_close_conns(clt_path
, true);
2859 kobject_put(&clt_path
->kobj
);
2861 rtrs_clt_destroy_sysfs_root(clt
);
2865 return ERR_PTR(err
);
2867 EXPORT_SYMBOL(rtrs_clt_open
);
2870 * rtrs_clt_close() - Close a path
2871 * @clt: Session handle. Session is freed upon return.
2873 void rtrs_clt_close(struct rtrs_clt_sess
*clt
)
2875 struct rtrs_clt_path
*clt_path
, *tmp
;
2877 /* Firstly forbid sysfs access */
2878 rtrs_clt_destroy_sysfs_root(clt
);
2880 /* Now it is safe to iterate over all paths without locks */
2881 list_for_each_entry_safe(clt_path
, tmp
, &clt
->paths_list
, s
.entry
) {
2882 rtrs_clt_close_conns(clt_path
, true);
2883 rtrs_clt_destroy_path_files(clt_path
, NULL
);
2884 kobject_put(&clt_path
->kobj
);
2889 EXPORT_SYMBOL(rtrs_clt_close
);
2891 int rtrs_clt_reconnect_from_sysfs(struct rtrs_clt_path
*clt_path
)
2893 enum rtrs_clt_state old_state
;
2897 changed
= rtrs_clt_change_state_get_old(clt_path
,
2898 RTRS_CLT_RECONNECTING
,
2901 clt_path
->reconnect_attempts
= 0;
2902 rtrs_clt_stop_and_destroy_conns(clt_path
);
2903 queue_delayed_work(rtrs_wq
, &clt_path
->reconnect_dwork
, 0);
2905 if (changed
|| old_state
== RTRS_CLT_RECONNECTING
) {
2907 * flush_delayed_work() queues pending work for immediate
2908 * execution, so do the flush if we have queued something
2909 * right now or work is pending.
2911 flush_delayed_work(&clt_path
->reconnect_dwork
);
2912 err
= (READ_ONCE(clt_path
->state
) ==
2913 RTRS_CLT_CONNECTED
? 0 : -ENOTCONN
);
2919 int rtrs_clt_remove_path_from_sysfs(struct rtrs_clt_path
*clt_path
,
2920 const struct attribute
*sysfs_self
)
2922 enum rtrs_clt_state old_state
;
2926 * Continue stopping path till state was changed to DEAD or
2927 * state was observed as DEAD:
2928 * 1. State was changed to DEAD - we were fast and nobody
2929 * invoked rtrs_clt_reconnect(), which can again start
2931 * 2. State was observed as DEAD - we have someone in parallel
2932 * removing the path.
2935 rtrs_clt_close_conns(clt_path
, true);
2936 changed
= rtrs_clt_change_state_get_old(clt_path
,
2939 } while (!changed
&& old_state
!= RTRS_CLT_DEAD
);
2942 rtrs_clt_remove_path_from_arr(clt_path
);
2943 rtrs_clt_destroy_path_files(clt_path
, sysfs_self
);
2944 kobject_put(&clt_path
->kobj
);
2950 void rtrs_clt_set_max_reconnect_attempts(struct rtrs_clt_sess
*clt
, int value
)
2952 clt
->max_reconnect_attempts
= (unsigned int)value
;
2955 int rtrs_clt_get_max_reconnect_attempts(const struct rtrs_clt_sess
*clt
)
2957 return (int)clt
->max_reconnect_attempts
;
2961 * rtrs_clt_request() - Request data transfer to/from server via RDMA.
2964 * @ops: callback function to be called as confirmation, and the pointer.
2966 * @permit: Preallocated permit
2967 * @vec: Message that is sent to server together with the request.
2968 * Sum of len of all @vec elements limited to <= IO_MSG_SIZE.
2969 * Since the msg is copied internally it can be allocated on stack.
2970 * @nr: Number of elements in @vec.
2971 * @data_len: length of data sent to/from server
2972 * @sg: Pages to be sent/received to/from server.
2973 * @sg_cnt: Number of elements in the @sg
2979 * On dir=READ rtrs client will request a data transfer from Server to client.
2980 * The data that the server will respond with will be stored in @sg when
2981 * the user receives an %RTRS_CLT_RDMA_EV_RDMA_REQUEST_WRITE_COMPL event.
2982 * On dir=WRITE rtrs client will rdma write data in sg to server side.
2984 int rtrs_clt_request(int dir
, struct rtrs_clt_req_ops
*ops
,
2985 struct rtrs_clt_sess
*clt
, struct rtrs_permit
*permit
,
2986 const struct kvec
*vec
, size_t nr
, size_t data_len
,
2987 struct scatterlist
*sg
, unsigned int sg_cnt
)
2989 struct rtrs_clt_io_req
*req
;
2990 struct rtrs_clt_path
*clt_path
;
2992 enum dma_data_direction dma_dir
;
2993 int err
= -ECONNABORTED
, i
;
2994 size_t usr_len
, hdr_len
;
2997 /* Get kvec length */
2998 for (i
= 0, usr_len
= 0; i
< nr
; i
++)
2999 usr_len
+= vec
[i
].iov_len
;
3002 hdr_len
= sizeof(struct rtrs_msg_rdma_read
) +
3003 sg_cnt
* sizeof(struct rtrs_sg_desc
);
3004 dma_dir
= DMA_FROM_DEVICE
;
3006 hdr_len
= sizeof(struct rtrs_msg_rdma_write
);
3007 dma_dir
= DMA_TO_DEVICE
;
3011 for (path_it_init(&it
, clt
);
3012 (clt_path
= it
.next_path(&it
)) && it
.i
< it
.clt
->paths_num
; it
.i
++) {
3013 if (READ_ONCE(clt_path
->state
) != RTRS_CLT_CONNECTED
)
3016 if (usr_len
+ hdr_len
> clt_path
->max_hdr_size
) {
3017 rtrs_wrn_rl(clt_path
->clt
,
3018 "%s request failed, user message size is %zu and header length %zu, but max size is %u\n",
3019 dir
== READ
? "Read" : "Write",
3020 usr_len
, hdr_len
, clt_path
->max_hdr_size
);
3024 req
= rtrs_clt_get_req(clt_path
, ops
->conf_fn
, permit
, ops
->priv
,
3025 vec
, usr_len
, sg
, sg_cnt
, data_len
,
3028 err
= rtrs_clt_read_req(req
);
3030 err
= rtrs_clt_write_req(req
);
3032 req
->in_use
= false;
3038 path_it_deinit(&it
);
3043 EXPORT_SYMBOL(rtrs_clt_request
);
3045 int rtrs_clt_rdma_cq_direct(struct rtrs_clt_sess
*clt
, unsigned int index
)
3047 /* If no path, return -1 for block layer not to try again */
3049 struct rtrs_con
*con
;
3050 struct rtrs_clt_path
*clt_path
;
3054 for (path_it_init(&it
, clt
);
3055 (clt_path
= it
.next_path(&it
)) && it
.i
< it
.clt
->paths_num
; it
.i
++) {
3056 if (READ_ONCE(clt_path
->state
) != RTRS_CLT_CONNECTED
)
3059 con
= clt_path
->s
.con
[index
+ 1];
3060 cnt
= ib_process_cq_direct(con
->cq
, -1);
3064 path_it_deinit(&it
);
3069 EXPORT_SYMBOL(rtrs_clt_rdma_cq_direct
);
3072 * rtrs_clt_query() - queries RTRS session attributes
3073 *@clt: session pointer
3074 *@attr: query results for session attributes.
3077 * -ECOMM no connection to the server
3079 int rtrs_clt_query(struct rtrs_clt_sess
*clt
, struct rtrs_attrs
*attr
)
3081 if (!rtrs_clt_is_connected(clt
))
3084 attr
->queue_depth
= clt
->queue_depth
;
3085 attr
->max_segments
= clt
->max_segments
;
3086 /* Cap max_io_size to min of remote buffer size and the fr pages */
3087 attr
->max_io_size
= min_t(int, clt
->max_io_size
,
3088 clt
->max_segments
* SZ_4K
);
3092 EXPORT_SYMBOL(rtrs_clt_query
);
3094 int rtrs_clt_create_path_from_sysfs(struct rtrs_clt_sess
*clt
,
3095 struct rtrs_addr
*addr
)
3097 struct rtrs_clt_path
*clt_path
;
3100 clt_path
= alloc_path(clt
, addr
, nr_cpu_ids
, 0);
3101 if (IS_ERR(clt_path
))
3102 return PTR_ERR(clt_path
);
3104 mutex_lock(&clt
->paths_mutex
);
3105 if (clt
->paths_num
== 0) {
3107 * When all the paths are removed for a session,
3108 * the addition of the first path is like a new session for
3109 * the storage server
3111 clt_path
->for_new_clt
= 1;
3114 mutex_unlock(&clt
->paths_mutex
);
3117 * It is totally safe to add path in CONNECTING state: coming
3118 * IO will never grab it. Also it is very important to add
3119 * path before init, since init fires LINK_CONNECTED event.
3121 rtrs_clt_add_path_to_arr(clt_path
);
3123 err
= init_path(clt_path
);
3127 err
= rtrs_clt_create_path_files(clt_path
);
3134 rtrs_clt_remove_path_from_arr(clt_path
);
3135 rtrs_clt_close_conns(clt_path
, true);
3136 free_percpu(clt_path
->stats
->pcpu_stats
);
3137 kfree(clt_path
->stats
);
3138 free_path(clt_path
);
3143 static int rtrs_clt_ib_dev_init(struct rtrs_ib_dev
*dev
)
3145 if (!(dev
->ib_dev
->attrs
.device_cap_flags
&
3146 IB_DEVICE_MEM_MGT_EXTENSIONS
)) {
3147 pr_err("Memory registrations not supported.\n");
3154 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops
= {
3155 .init
= rtrs_clt_ib_dev_init
3158 static int __init
rtrs_client_init(void)
3162 rtrs_rdma_dev_pd_init(0, &dev_pd
);
3163 ret
= class_register(&rtrs_clt_dev_class
);
3165 pr_err("Failed to create rtrs-client dev class\n");
3168 rtrs_wq
= alloc_workqueue("rtrs_client_wq", 0, 0);
3170 class_unregister(&rtrs_clt_dev_class
);
3177 static void __exit
rtrs_client_exit(void)
3179 destroy_workqueue(rtrs_wq
);
3180 class_unregister(&rtrs_clt_dev_class
);
3181 rtrs_rdma_dev_pd_deinit(&dev_pd
);
3184 module_init(rtrs_client_init
);
3185 module_exit(rtrs_client_exit
);