1 // SPDX-License-Identifier: GPL-2.0
3 * Copyright (c) 2015-2018 Oracle. All rights reserved.
5 * Support for backward direction RPCs on RPC/RDMA (server-side).
8 #include <linux/sunrpc/svc_rdma.h>
10 #include "xprt_rdma.h"
11 #include <trace/events/rpcrdma.h>
13 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
15 #undef SVCRDMA_BACKCHANNEL_DEBUG
18 * svc_rdma_handle_bc_reply - Process incoming backchannel reply
19 * @xprt: controlling backchannel transport
20 * @rdma_resp: pointer to incoming transport header
21 * @rcvbuf: XDR buffer into which to decode the reply
24 * %0 if @rcvbuf is filled in, xprt_complete_rqst called,
25 * %-EAGAIN if server should call ->recvfrom again.
27 int svc_rdma_handle_bc_reply(struct rpc_xprt
*xprt
, __be32
*rdma_resp
,
28 struct xdr_buf
*rcvbuf
)
30 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
31 struct kvec
*dst
, *src
= &rcvbuf
->head
[0];
39 p
= (__be32
*)src
->iov_base
;
43 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
44 pr_info("%s: xid=%08x, length=%zu\n",
45 __func__
, be32_to_cpu(xid
), len
);
46 pr_info("%s: RPC/RDMA: %*ph\n",
47 __func__
, (int)RPCRDMA_HDRLEN_MIN
, rdma_resp
);
48 pr_info("%s: RPC: %*ph\n",
49 __func__
, (int)len
, p
);
53 if (src
->iov_len
< 24)
56 spin_lock(&xprt
->queue_lock
);
57 req
= xprt_lookup_rqst(xprt
, xid
);
61 dst
= &req
->rq_private_buf
.head
[0];
62 memcpy(&req
->rq_private_buf
, &req
->rq_rcv_buf
, sizeof(struct xdr_buf
));
63 if (dst
->iov_len
< len
)
65 memcpy(dst
->iov_base
, p
, len
);
67 spin_unlock(&xprt
->queue_lock
);
69 credits
= be32_to_cpup(rdma_resp
+ 2);
71 credits
= 1; /* don't deadlock */
72 else if (credits
> r_xprt
->rx_buf
.rb_bc_max_requests
)
73 credits
= r_xprt
->rx_buf
.rb_bc_max_requests
;
75 spin_lock(&xprt
->transport_lock
);
76 xprt
->cwnd
= credits
<< RPC_CWNDSHIFT
;
77 spin_unlock(&xprt
->transport_lock
);
79 spin_lock(&xprt
->queue_lock
);
81 xprt_complete_rqst(req
->rq_task
, rcvbuf
->len
);
86 spin_unlock(&xprt
->queue_lock
);
91 dprintk("svcrdma: short bc reply: xprt=%p, len=%zu\n",
96 dprintk("svcrdma: unrecognized bc reply: xprt=%p, xid=%08x\n",
97 xprt
, be32_to_cpu(xid
));
101 /* Send a backwards direction RPC call.
103 * Caller holds the connection's mutex and has already marshaled
104 * the RPC/RDMA request.
106 * This is similar to svc_rdma_send_reply_msg, but takes a struct
107 * rpc_rqst instead, does not support chunks, and avoids blocking
110 * XXX: There is still an opportunity to block in svc_rdma_send()
111 * if there are no SQ entries to post the Send. This may occur if
112 * the adapter has a small maximum SQ depth.
114 static int svc_rdma_bc_sendto(struct svcxprt_rdma
*rdma
,
115 struct rpc_rqst
*rqst
,
116 struct svc_rdma_send_ctxt
*ctxt
)
120 ret
= svc_rdma_map_reply_msg(rdma
, ctxt
, NULL
, &rqst
->rq_snd_buf
);
124 /* Bump page refcnt so Send completion doesn't release
125 * the rq_buffer before all retransmits are complete.
127 get_page(virt_to_page(rqst
->rq_buffer
));
128 ctxt
->sc_send_wr
.opcode
= IB_WR_SEND
;
129 return svc_rdma_send(rdma
, &ctxt
->sc_send_wr
);
132 /* Server-side transport endpoint wants a whole page for its send
133 * buffer. The client RPC code constructs the RPC header in this
134 * buffer before it invokes ->send_request.
137 xprt_rdma_bc_allocate(struct rpc_task
*task
)
139 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
140 size_t size
= rqst
->rq_callsize
;
143 if (size
> PAGE_SIZE
) {
144 WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
149 page
= alloc_page(RPCRDMA_DEF_GFP
);
152 rqst
->rq_buffer
= page_address(page
);
154 rqst
->rq_rbuffer
= kmalloc(rqst
->rq_rcvsize
, RPCRDMA_DEF_GFP
);
155 if (!rqst
->rq_rbuffer
) {
163 xprt_rdma_bc_free(struct rpc_task
*task
)
165 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
167 put_page(virt_to_page(rqst
->rq_buffer
));
168 kfree(rqst
->rq_rbuffer
);
172 rpcrdma_bc_send_request(struct svcxprt_rdma
*rdma
, struct rpc_rqst
*rqst
)
174 struct rpc_xprt
*xprt
= rqst
->rq_xprt
;
175 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
176 struct svc_rdma_send_ctxt
*ctxt
;
180 ctxt
= svc_rdma_send_ctxt_get(rdma
);
182 goto drop_connection
;
184 p
= xdr_reserve_space(&ctxt
->sc_stream
, RPCRDMA_HDRLEN_MIN
);
188 *p
++ = rpcrdma_version
;
189 *p
++ = cpu_to_be32(r_xprt
->rx_buf
.rb_bc_max_requests
);
195 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
196 pr_info("%s: %*ph\n", __func__
, 64, rqst
->rq_buffer
);
199 rqst
->rq_xtime
= ktime_get();
200 rc
= svc_rdma_bc_sendto(rdma
, rqst
, ctxt
);
206 svc_rdma_send_ctxt_put(rdma
, ctxt
);
209 dprintk("svcrdma: failed to send bc call\n");
213 /* Send an RPC call on the passive end of a transport
217 xprt_rdma_bc_send_request(struct rpc_rqst
*rqst
)
219 struct svc_xprt
*sxprt
= rqst
->rq_xprt
->bc_xprt
;
220 struct svcxprt_rdma
*rdma
;
223 dprintk("svcrdma: sending bc call with xid: %08x\n",
224 be32_to_cpu(rqst
->rq_xid
));
226 mutex_lock(&sxprt
->xpt_mutex
);
229 rdma
= container_of(sxprt
, struct svcxprt_rdma
, sc_xprt
);
230 if (!test_bit(XPT_DEAD
, &sxprt
->xpt_flags
)) {
231 ret
= rpcrdma_bc_send_request(rdma
, rqst
);
232 if (ret
== -ENOTCONN
)
233 svc_close_xprt(sxprt
);
236 mutex_unlock(&sxprt
->xpt_mutex
);
244 xprt_rdma_bc_close(struct rpc_xprt
*xprt
)
246 dprintk("svcrdma: %s: xprt %p\n", __func__
, xprt
);
247 xprt
->cwnd
= RPC_CWNDSHIFT
;
251 xprt_rdma_bc_put(struct rpc_xprt
*xprt
)
253 dprintk("svcrdma: %s: xprt %p\n", __func__
, xprt
);
255 xprt_rdma_free_addresses(xprt
);
259 static const struct rpc_xprt_ops xprt_rdma_bc_procs
= {
260 .reserve_xprt
= xprt_reserve_xprt_cong
,
261 .release_xprt
= xprt_release_xprt_cong
,
262 .alloc_slot
= xprt_alloc_slot
,
263 .free_slot
= xprt_free_slot
,
264 .release_request
= xprt_release_rqst_cong
,
265 .buf_alloc
= xprt_rdma_bc_allocate
,
266 .buf_free
= xprt_rdma_bc_free
,
267 .send_request
= xprt_rdma_bc_send_request
,
268 .wait_for_reply_request
= xprt_wait_for_reply_request_def
,
269 .close
= xprt_rdma_bc_close
,
270 .destroy
= xprt_rdma_bc_put
,
271 .print_stats
= xprt_rdma_print_stats
274 static const struct rpc_timeout xprt_rdma_bc_timeout
= {
275 .to_initval
= 60 * HZ
,
276 .to_maxval
= 60 * HZ
,
279 /* It shouldn't matter if the number of backchannel session slots
280 * doesn't match the number of RPC/RDMA credits. That just means
281 * one or the other will have extra slots that aren't used.
283 static struct rpc_xprt
*
284 xprt_setup_rdma_bc(struct xprt_create
*args
)
286 struct rpc_xprt
*xprt
;
287 struct rpcrdma_xprt
*new_xprt
;
289 if (args
->addrlen
> sizeof(xprt
->addr
)) {
290 dprintk("RPC: %s: address too large\n", __func__
);
291 return ERR_PTR(-EBADF
);
294 xprt
= xprt_alloc(args
->net
, sizeof(*new_xprt
),
295 RPCRDMA_MAX_BC_REQUESTS
,
296 RPCRDMA_MAX_BC_REQUESTS
);
298 dprintk("RPC: %s: couldn't allocate rpc_xprt\n",
300 return ERR_PTR(-ENOMEM
);
303 xprt
->timeout
= &xprt_rdma_bc_timeout
;
304 xprt_set_bound(xprt
);
305 xprt_set_connected(xprt
);
306 xprt
->bind_timeout
= RPCRDMA_BIND_TO
;
307 xprt
->reestablish_timeout
= RPCRDMA_INIT_REEST_TO
;
308 xprt
->idle_timeout
= RPCRDMA_IDLE_DISC_TO
;
310 xprt
->prot
= XPRT_TRANSPORT_BC_RDMA
;
311 xprt
->ops
= &xprt_rdma_bc_procs
;
313 memcpy(&xprt
->addr
, args
->dstaddr
, args
->addrlen
);
314 xprt
->addrlen
= args
->addrlen
;
315 xprt_rdma_format_addresses(xprt
, (struct sockaddr
*)&xprt
->addr
);
318 xprt
->max_payload
= xprt_rdma_max_inline_read
;
320 new_xprt
= rpcx_to_rdmax(xprt
);
321 new_xprt
->rx_buf
.rb_bc_max_requests
= xprt
->max_reqs
;
324 args
->bc_xprt
->xpt_bc_xprt
= xprt
;
325 xprt
->bc_xprt
= args
->bc_xprt
;
327 /* Final put for backchannel xprt is in __svc_rdma_free */
332 struct xprt_class xprt_rdma_bc
= {
333 .list
= LIST_HEAD_INIT(xprt_rdma_bc
.list
),
334 .name
= "rdma backchannel",
335 .owner
= THIS_MODULE
,
336 .ident
= XPRT_TRANSPORT_BC_RDMA
,
337 .setup
= xprt_setup_rdma_bc
,