1 // SPDX-License-Identifier: GPL-2.0
3 * Copyright (c) 2015 Oracle. All rights reserved.
5 * Support for backward direction RPCs on RPC/RDMA (server-side).
8 #include <linux/module.h>
9 #include <linux/sunrpc/svc_rdma.h>
10 #include "xprt_rdma.h"
12 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
14 #undef SVCRDMA_BACKCHANNEL_DEBUG
17 * svc_rdma_handle_bc_reply - Process incoming backchannel reply
18 * @xprt: controlling backchannel transport
19 * @rdma_resp: pointer to incoming transport header
20 * @rcvbuf: XDR buffer into which to decode the reply
23 * %0 if @rcvbuf is filled in, xprt_complete_rqst called,
24 * %-EAGAIN if server should call ->recvfrom again.
26 int svc_rdma_handle_bc_reply(struct rpc_xprt
*xprt
, __be32
*rdma_resp
,
27 struct xdr_buf
*rcvbuf
)
29 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
30 struct kvec
*dst
, *src
= &rcvbuf
->head
[0];
39 p
= (__be32
*)src
->iov_base
;
43 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
44 pr_info("%s: xid=%08x, length=%zu\n",
45 __func__
, be32_to_cpu(xid
), len
);
46 pr_info("%s: RPC/RDMA: %*ph\n",
47 __func__
, (int)RPCRDMA_HDRLEN_MIN
, rdma_resp
);
48 pr_info("%s: RPC: %*ph\n",
49 __func__
, (int)len
, p
);
53 if (src
->iov_len
< 24)
56 spin_lock(&xprt
->recv_lock
);
57 req
= xprt_lookup_rqst(xprt
, xid
);
61 dst
= &req
->rq_private_buf
.head
[0];
62 memcpy(&req
->rq_private_buf
, &req
->rq_rcv_buf
, sizeof(struct xdr_buf
));
63 if (dst
->iov_len
< len
)
65 memcpy(dst
->iov_base
, p
, len
);
67 credits
= be32_to_cpup(rdma_resp
+ 2);
69 credits
= 1; /* don't deadlock */
70 else if (credits
> r_xprt
->rx_buf
.rb_bc_max_requests
)
71 credits
= r_xprt
->rx_buf
.rb_bc_max_requests
;
73 spin_lock_bh(&xprt
->transport_lock
);
75 xprt
->cwnd
= credits
<< RPC_CWNDSHIFT
;
76 if (xprt
->cwnd
> cwnd
)
77 xprt_release_rqst_cong(req
->rq_task
);
78 spin_unlock_bh(&xprt
->transport_lock
);
82 xprt_complete_rqst(req
->rq_task
, rcvbuf
->len
);
86 spin_unlock(&xprt
->recv_lock
);
91 dprintk("svcrdma: short bc reply: xprt=%p, len=%zu\n",
96 dprintk("svcrdma: unrecognized bc reply: xprt=%p, xid=%08x\n",
97 xprt
, be32_to_cpu(xid
));
102 /* Send a backwards direction RPC call.
104 * Caller holds the connection's mutex and has already marshaled
105 * the RPC/RDMA request.
107 * This is similar to svc_rdma_send_reply_msg, but takes a struct
108 * rpc_rqst instead, does not support chunks, and avoids blocking
111 * XXX: There is still an opportunity to block in svc_rdma_send()
112 * if there are no SQ entries to post the Send. This may occur if
113 * the adapter has a small maximum SQ depth.
115 static int svc_rdma_bc_sendto(struct svcxprt_rdma
*rdma
,
116 struct rpc_rqst
*rqst
)
118 struct svc_rdma_op_ctxt
*ctxt
;
121 ctxt
= svc_rdma_get_context(rdma
);
123 /* rpcrdma_bc_send_request builds the transport header and
124 * the backchannel RPC message in the same buffer. Thus only
125 * one SGE is needed to send both.
127 ret
= svc_rdma_map_reply_hdr(rdma
, ctxt
, rqst
->rq_buffer
,
128 rqst
->rq_snd_buf
.len
);
132 ret
= svc_rdma_repost_recv(rdma
, GFP_NOIO
);
136 ret
= svc_rdma_post_send_wr(rdma
, ctxt
, 1, 0);
141 dprintk("svcrdma: %s returns %d\n", __func__
, ret
);
145 svc_rdma_unmap_dma(ctxt
);
146 svc_rdma_put_context(ctxt
, 1);
151 /* Server-side transport endpoint wants a whole page for its send
152 * buffer. The client RPC code constructs the RPC header in this
153 * buffer before it invokes ->send_request.
156 xprt_rdma_bc_allocate(struct rpc_task
*task
)
158 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
159 size_t size
= rqst
->rq_callsize
;
162 if (size
> PAGE_SIZE
) {
163 WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
168 /* svc_rdma_sendto releases this page */
169 page
= alloc_page(RPCRDMA_DEF_GFP
);
172 rqst
->rq_buffer
= page_address(page
);
174 rqst
->rq_rbuffer
= kmalloc(rqst
->rq_rcvsize
, RPCRDMA_DEF_GFP
);
175 if (!rqst
->rq_rbuffer
) {
183 xprt_rdma_bc_free(struct rpc_task
*task
)
185 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
187 kfree(rqst
->rq_rbuffer
);
191 rpcrdma_bc_send_request(struct svcxprt_rdma
*rdma
, struct rpc_rqst
*rqst
)
193 struct rpc_xprt
*xprt
= rqst
->rq_xprt
;
194 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
198 /* Space in the send buffer for an RPC/RDMA header is reserved
199 * via xprt->tsh_size.
203 *p
++ = rpcrdma_version
;
204 *p
++ = cpu_to_be32(r_xprt
->rx_buf
.rb_bc_max_requests
);
210 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
211 pr_info("%s: %*ph\n", __func__
, 64, rqst
->rq_buffer
);
214 rc
= svc_rdma_bc_sendto(rdma
, rqst
);
216 goto drop_connection
;
220 dprintk("svcrdma: failed to send bc call\n");
221 xprt_disconnect_done(xprt
);
225 /* Send an RPC call on the passive end of a transport
229 xprt_rdma_bc_send_request(struct rpc_task
*task
)
231 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
232 struct svc_xprt
*sxprt
= rqst
->rq_xprt
->bc_xprt
;
233 struct svcxprt_rdma
*rdma
;
236 dprintk("svcrdma: sending bc call with xid: %08x\n",
237 be32_to_cpu(rqst
->rq_xid
));
239 if (!mutex_trylock(&sxprt
->xpt_mutex
)) {
240 rpc_sleep_on(&sxprt
->xpt_bc_pending
, task
, NULL
);
241 if (!mutex_trylock(&sxprt
->xpt_mutex
))
243 rpc_wake_up_queued_task(&sxprt
->xpt_bc_pending
, task
);
247 rdma
= container_of(sxprt
, struct svcxprt_rdma
, sc_xprt
);
248 if (!test_bit(XPT_DEAD
, &sxprt
->xpt_flags
))
249 ret
= rpcrdma_bc_send_request(rdma
, rqst
);
251 mutex_unlock(&sxprt
->xpt_mutex
);
259 xprt_rdma_bc_close(struct rpc_xprt
*xprt
)
261 dprintk("svcrdma: %s: xprt %p\n", __func__
, xprt
);
265 xprt_rdma_bc_put(struct rpc_xprt
*xprt
)
267 dprintk("svcrdma: %s: xprt %p\n", __func__
, xprt
);
270 module_put(THIS_MODULE
);
273 static const struct rpc_xprt_ops xprt_rdma_bc_procs
= {
274 .reserve_xprt
= xprt_reserve_xprt_cong
,
275 .release_xprt
= xprt_release_xprt_cong
,
276 .alloc_slot
= xprt_alloc_slot
,
277 .release_request
= xprt_release_rqst_cong
,
278 .buf_alloc
= xprt_rdma_bc_allocate
,
279 .buf_free
= xprt_rdma_bc_free
,
280 .send_request
= xprt_rdma_bc_send_request
,
281 .set_retrans_timeout
= xprt_set_retrans_timeout_def
,
282 .close
= xprt_rdma_bc_close
,
283 .destroy
= xprt_rdma_bc_put
,
284 .print_stats
= xprt_rdma_print_stats
287 static const struct rpc_timeout xprt_rdma_bc_timeout
= {
288 .to_initval
= 60 * HZ
,
289 .to_maxval
= 60 * HZ
,
292 /* It shouldn't matter if the number of backchannel session slots
293 * doesn't match the number of RPC/RDMA credits. That just means
294 * one or the other will have extra slots that aren't used.
296 static struct rpc_xprt
*
297 xprt_setup_rdma_bc(struct xprt_create
*args
)
299 struct rpc_xprt
*xprt
;
300 struct rpcrdma_xprt
*new_xprt
;
302 if (args
->addrlen
> sizeof(xprt
->addr
)) {
303 dprintk("RPC: %s: address too large\n", __func__
);
304 return ERR_PTR(-EBADF
);
307 xprt
= xprt_alloc(args
->net
, sizeof(*new_xprt
),
308 RPCRDMA_MAX_BC_REQUESTS
,
309 RPCRDMA_MAX_BC_REQUESTS
);
311 dprintk("RPC: %s: couldn't allocate rpc_xprt\n",
313 return ERR_PTR(-ENOMEM
);
316 xprt
->timeout
= &xprt_rdma_bc_timeout
;
317 xprt_set_bound(xprt
);
318 xprt_set_connected(xprt
);
319 xprt
->bind_timeout
= RPCRDMA_BIND_TO
;
320 xprt
->reestablish_timeout
= RPCRDMA_INIT_REEST_TO
;
321 xprt
->idle_timeout
= RPCRDMA_IDLE_DISC_TO
;
323 xprt
->prot
= XPRT_TRANSPORT_BC_RDMA
;
324 xprt
->tsh_size
= RPCRDMA_HDRLEN_MIN
/ sizeof(__be32
);
325 xprt
->ops
= &xprt_rdma_bc_procs
;
327 memcpy(&xprt
->addr
, args
->dstaddr
, args
->addrlen
);
328 xprt
->addrlen
= args
->addrlen
;
329 xprt_rdma_format_addresses(xprt
, (struct sockaddr
*)&xprt
->addr
);
332 xprt
->max_payload
= xprt_rdma_max_inline_read
;
334 new_xprt
= rpcx_to_rdmax(xprt
);
335 new_xprt
->rx_buf
.rb_bc_max_requests
= xprt
->max_reqs
;
338 args
->bc_xprt
->xpt_bc_xprt
= xprt
;
339 xprt
->bc_xprt
= args
->bc_xprt
;
341 if (!try_module_get(THIS_MODULE
))
344 /* Final put for backchannel xprt is in __svc_rdma_free */
349 xprt_rdma_free_addresses(xprt
);
350 args
->bc_xprt
->xpt_bc_xprt
= NULL
;
351 args
->bc_xprt
->xpt_bc_xps
= NULL
;
354 return ERR_PTR(-EINVAL
);
357 struct xprt_class xprt_rdma_bc
= {
358 .list
= LIST_HEAD_INIT(xprt_rdma_bc
.list
),
359 .name
= "rdma backchannel",
360 .owner
= THIS_MODULE
,
361 .ident
= XPRT_TRANSPORT_BC_RDMA
,
362 .setup
= xprt_setup_rdma_bc
,