1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
3 * Copyright (c) 2014-2017 Oracle. All rights reserved.
4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
6 * This software is available to you under a choice of one of two
7 * licenses. You may choose to be licensed under the terms of the GNU
8 * General Public License (GPL) Version 2, available from the file
9 * COPYING in the main directory of this source tree, or the BSD-type
12 * Redistribution and use in source and binary forms, with or without
13 * modification, are permitted provided that the following conditions
16 * Redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer.
19 * Redistributions in binary form must reproduce the above
20 * copyright notice, this list of conditions and the following
21 * disclaimer in the documentation and/or other materials provided
22 * with the distribution.
24 * Neither the name of the Network Appliance, Inc. nor the names of
25 * its contributors may be used to endorse or promote products
26 * derived from this software without specific prior written
29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45 * Encapsulates the major functions managing:
52 #include <linux/interrupt.h>
53 #include <linux/slab.h>
54 #include <linux/sunrpc/addr.h>
55 #include <linux/sunrpc/svc_rdma.h>
56 #include <linux/log2.h>
58 #include <asm-generic/barrier.h>
59 #include <asm/bitops.h>
61 #include <rdma/ib_cm.h>
63 #include "xprt_rdma.h"
64 #include <trace/events/rpcrdma.h>
70 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
71 # define RPCDBG_FACILITY RPCDBG_TRANS
77 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt
*r_xprt
);
78 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt
*r_xprt
);
79 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt
*r_xprt
,
80 struct rpcrdma_sendctx
*sc
);
81 static int rpcrdma_reqs_setup(struct rpcrdma_xprt
*r_xprt
);
82 static void rpcrdma_reqs_reset(struct rpcrdma_xprt
*r_xprt
);
83 static void rpcrdma_rep_destroy(struct rpcrdma_rep
*rep
);
84 static void rpcrdma_reps_unmap(struct rpcrdma_xprt
*r_xprt
);
85 static void rpcrdma_mrs_create(struct rpcrdma_xprt
*r_xprt
);
86 static void rpcrdma_mrs_destroy(struct rpcrdma_xprt
*r_xprt
);
87 static int rpcrdma_ep_destroy(struct rpcrdma_ep
*ep
);
88 static struct rpcrdma_regbuf
*
89 rpcrdma_regbuf_alloc(size_t size
, enum dma_data_direction direction
,
91 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf
*rb
);
92 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf
*rb
);
94 /* Wait for outstanding transport work to finish. ib_drain_qp
95 * handles the drains in the wrong order for us, so open code
98 static void rpcrdma_xprt_drain(struct rpcrdma_xprt
*r_xprt
)
100 struct rdma_cm_id
*id
= r_xprt
->rx_ep
->re_id
;
102 /* Flush Receives, then wait for deferred Reply work
107 /* Deferred Reply processing might have scheduled
108 * local invalidations.
114 * rpcrdma_qp_event_handler - Handle one QP event (error notification)
115 * @event: details of the event
116 * @context: ep that owns QP where event occurred
118 * Called from the RDMA provider (device driver) possibly in an interrupt
119 * context. The QP is always destroyed before the ID, so the ID will be
120 * reliably available when this handler is invoked.
122 static void rpcrdma_qp_event_handler(struct ib_event
*event
, void *context
)
124 struct rpcrdma_ep
*ep
= context
;
126 trace_xprtrdma_qp_event(ep
, event
);
130 * rpcrdma_flush_disconnect - Disconnect on flushed completion
131 * @cq: completion queue
132 * @wc: work completion entry
134 * Must be called in process context.
136 void rpcrdma_flush_disconnect(struct ib_cq
*cq
, struct ib_wc
*wc
)
138 struct rpcrdma_xprt
*r_xprt
= cq
->cq_context
;
139 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
141 if (wc
->status
!= IB_WC_SUCCESS
&&
142 r_xprt
->rx_ep
->re_connect_status
== 1) {
143 r_xprt
->rx_ep
->re_connect_status
= -ECONNABORTED
;
144 trace_xprtrdma_flush_dct(r_xprt
, wc
->status
);
145 xprt_force_disconnect(xprt
);
150 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
151 * @cq: completion queue
152 * @wc: WCE for a completed Send WR
155 static void rpcrdma_wc_send(struct ib_cq
*cq
, struct ib_wc
*wc
)
157 struct ib_cqe
*cqe
= wc
->wr_cqe
;
158 struct rpcrdma_sendctx
*sc
=
159 container_of(cqe
, struct rpcrdma_sendctx
, sc_cqe
);
161 /* WARNING: Only wr_cqe and status are reliable at this point */
162 trace_xprtrdma_wc_send(sc
, wc
);
163 rpcrdma_sendctx_put_locked((struct rpcrdma_xprt
*)cq
->cq_context
, sc
);
164 rpcrdma_flush_disconnect(cq
, wc
);
168 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
169 * @cq: completion queue
170 * @wc: WCE for a completed Receive WR
173 static void rpcrdma_wc_receive(struct ib_cq
*cq
, struct ib_wc
*wc
)
175 struct ib_cqe
*cqe
= wc
->wr_cqe
;
176 struct rpcrdma_rep
*rep
= container_of(cqe
, struct rpcrdma_rep
,
178 struct rpcrdma_xprt
*r_xprt
= cq
->cq_context
;
180 /* WARNING: Only wr_cqe and status are reliable at this point */
181 trace_xprtrdma_wc_receive(wc
);
182 --r_xprt
->rx_ep
->re_receive_count
;
183 if (wc
->status
!= IB_WC_SUCCESS
)
186 /* status == SUCCESS means all fields in wc are trustworthy */
187 rpcrdma_set_xdrlen(&rep
->rr_hdrbuf
, wc
->byte_len
);
188 rep
->rr_wc_flags
= wc
->wc_flags
;
189 rep
->rr_inv_rkey
= wc
->ex
.invalidate_rkey
;
191 ib_dma_sync_single_for_cpu(rdmab_device(rep
->rr_rdmabuf
),
192 rdmab_addr(rep
->rr_rdmabuf
),
193 wc
->byte_len
, DMA_FROM_DEVICE
);
195 rpcrdma_reply_handler(rep
);
199 rpcrdma_flush_disconnect(cq
, wc
);
200 rpcrdma_rep_destroy(rep
);
203 static void rpcrdma_update_cm_private(struct rpcrdma_ep
*ep
,
204 struct rdma_conn_param
*param
)
206 const struct rpcrdma_connect_private
*pmsg
= param
->private_data
;
207 unsigned int rsize
, wsize
;
209 /* Default settings for RPC-over-RDMA Version One */
210 ep
->re_implicit_roundup
= xprt_rdma_pad_optimize
;
211 rsize
= RPCRDMA_V1_DEF_INLINE_SIZE
;
212 wsize
= RPCRDMA_V1_DEF_INLINE_SIZE
;
215 pmsg
->cp_magic
== rpcrdma_cmp_magic
&&
216 pmsg
->cp_version
== RPCRDMA_CMP_VERSION
) {
217 ep
->re_implicit_roundup
= true;
218 rsize
= rpcrdma_decode_buffer_size(pmsg
->cp_send_size
);
219 wsize
= rpcrdma_decode_buffer_size(pmsg
->cp_recv_size
);
222 if (rsize
< ep
->re_inline_recv
)
223 ep
->re_inline_recv
= rsize
;
224 if (wsize
< ep
->re_inline_send
)
225 ep
->re_inline_send
= wsize
;
227 rpcrdma_set_max_header_sizes(ep
);
231 * rpcrdma_cm_event_handler - Handle RDMA CM events
232 * @id: rdma_cm_id on which an event has occurred
233 * @event: details of the event
235 * Called with @id's mutex held. Returns 1 if caller should
236 * destroy @id, otherwise 0.
239 rpcrdma_cm_event_handler(struct rdma_cm_id
*id
, struct rdma_cm_event
*event
)
241 struct sockaddr
*sap
= (struct sockaddr
*)&id
->route
.addr
.dst_addr
;
242 struct rpcrdma_ep
*ep
= id
->context
;
243 struct rpc_xprt
*xprt
= ep
->re_xprt
;
247 switch (event
->event
) {
248 case RDMA_CM_EVENT_ADDR_RESOLVED
:
249 case RDMA_CM_EVENT_ROUTE_RESOLVED
:
251 complete(&ep
->re_done
);
253 case RDMA_CM_EVENT_ADDR_ERROR
:
254 ep
->re_async_rc
= -EPROTO
;
255 complete(&ep
->re_done
);
257 case RDMA_CM_EVENT_ROUTE_ERROR
:
258 ep
->re_async_rc
= -ENETUNREACH
;
259 complete(&ep
->re_done
);
261 case RDMA_CM_EVENT_DEVICE_REMOVAL
:
262 pr_info("rpcrdma: removing device %s for %pISpc\n",
263 ep
->re_id
->device
->name
, sap
);
265 case RDMA_CM_EVENT_ADDR_CHANGE
:
266 ep
->re_connect_status
= -ENODEV
;
267 xprt_force_disconnect(xprt
);
269 case RDMA_CM_EVENT_ESTABLISHED
:
270 kref_get(&ep
->re_kref
);
271 ep
->re_connect_status
= 1;
272 rpcrdma_update_cm_private(ep
, &event
->param
.conn
);
273 trace_xprtrdma_inline_thresh(ep
);
274 wake_up_all(&ep
->re_connect_wait
);
276 case RDMA_CM_EVENT_CONNECT_ERROR
:
277 ep
->re_connect_status
= -ENOTCONN
;
279 case RDMA_CM_EVENT_UNREACHABLE
:
280 ep
->re_connect_status
= -ENETUNREACH
;
282 case RDMA_CM_EVENT_REJECTED
:
283 dprintk("rpcrdma: connection to %pISpc rejected: %s\n",
284 sap
, rdma_reject_msg(id
, event
->status
));
285 ep
->re_connect_status
= -ECONNREFUSED
;
286 if (event
->status
== IB_CM_REJ_STALE_CONN
)
287 ep
->re_connect_status
= -EAGAIN
;
289 case RDMA_CM_EVENT_DISCONNECTED
:
290 ep
->re_connect_status
= -ECONNABORTED
;
292 return rpcrdma_ep_destroy(ep
);
297 dprintk("RPC: %s: %pISpc on %s/frwr: %s\n", __func__
, sap
,
298 ep
->re_id
->device
->name
, rdma_event_msg(event
->event
));
302 static struct rdma_cm_id
*rpcrdma_create_id(struct rpcrdma_xprt
*r_xprt
,
303 struct rpcrdma_ep
*ep
)
305 unsigned long wtimeout
= msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT
) + 1;
306 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
307 struct rdma_cm_id
*id
;
310 init_completion(&ep
->re_done
);
312 id
= rdma_create_id(xprt
->xprt_net
, rpcrdma_cm_event_handler
, ep
,
313 RDMA_PS_TCP
, IB_QPT_RC
);
317 ep
->re_async_rc
= -ETIMEDOUT
;
318 rc
= rdma_resolve_addr(id
, NULL
, (struct sockaddr
*)&xprt
->addr
,
319 RDMA_RESOLVE_TIMEOUT
);
322 rc
= wait_for_completion_interruptible_timeout(&ep
->re_done
, wtimeout
);
326 rc
= ep
->re_async_rc
;
330 ep
->re_async_rc
= -ETIMEDOUT
;
331 rc
= rdma_resolve_route(id
, RDMA_RESOLVE_TIMEOUT
);
334 rc
= wait_for_completion_interruptible_timeout(&ep
->re_done
, wtimeout
);
337 rc
= ep
->re_async_rc
;
348 static void rpcrdma_ep_put(struct kref
*kref
)
350 struct rpcrdma_ep
*ep
= container_of(kref
, struct rpcrdma_ep
, re_kref
);
353 rdma_destroy_qp(ep
->re_id
);
354 ep
->re_id
->qp
= NULL
;
357 if (ep
->re_attr
.recv_cq
)
358 ib_free_cq(ep
->re_attr
.recv_cq
);
359 ep
->re_attr
.recv_cq
= NULL
;
360 if (ep
->re_attr
.send_cq
)
361 ib_free_cq(ep
->re_attr
.send_cq
);
362 ep
->re_attr
.send_cq
= NULL
;
365 ib_dealloc_pd(ep
->re_pd
);
369 module_put(THIS_MODULE
);
373 * %0 if @ep still has a positive kref count, or
374 * %1 if @ep was destroyed successfully.
376 static int rpcrdma_ep_destroy(struct rpcrdma_ep
*ep
)
378 return kref_put(&ep
->re_kref
, rpcrdma_ep_put
);
381 static int rpcrdma_ep_create(struct rpcrdma_xprt
*r_xprt
)
383 struct rpcrdma_connect_private
*pmsg
;
384 struct ib_device
*device
;
385 struct rdma_cm_id
*id
;
386 struct rpcrdma_ep
*ep
;
389 ep
= kzalloc(sizeof(*ep
), GFP_NOFS
);
392 ep
->re_xprt
= &r_xprt
->rx_xprt
;
393 kref_init(&ep
->re_kref
);
395 id
= rpcrdma_create_id(r_xprt
, ep
);
400 __module_get(THIS_MODULE
);
404 ep
->re_max_requests
= r_xprt
->rx_xprt
.max_reqs
;
405 ep
->re_inline_send
= xprt_rdma_max_inline_write
;
406 ep
->re_inline_recv
= xprt_rdma_max_inline_read
;
407 rc
= frwr_query_device(ep
, device
);
411 r_xprt
->rx_buf
.rb_max_requests
= cpu_to_be32(ep
->re_max_requests
);
413 ep
->re_attr
.event_handler
= rpcrdma_qp_event_handler
;
414 ep
->re_attr
.qp_context
= ep
;
415 ep
->re_attr
.srq
= NULL
;
416 ep
->re_attr
.cap
.max_inline_data
= 0;
417 ep
->re_attr
.sq_sig_type
= IB_SIGNAL_REQ_WR
;
418 ep
->re_attr
.qp_type
= IB_QPT_RC
;
419 ep
->re_attr
.port_num
= ~0;
421 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
422 "iovs: send %d recv %d\n",
424 ep
->re_attr
.cap
.max_send_wr
,
425 ep
->re_attr
.cap
.max_recv_wr
,
426 ep
->re_attr
.cap
.max_send_sge
,
427 ep
->re_attr
.cap
.max_recv_sge
);
429 ep
->re_send_batch
= ep
->re_max_requests
>> 3;
430 ep
->re_send_count
= ep
->re_send_batch
;
431 init_waitqueue_head(&ep
->re_connect_wait
);
433 ep
->re_attr
.send_cq
= ib_alloc_cq_any(device
, r_xprt
,
434 ep
->re_attr
.cap
.max_send_wr
,
436 if (IS_ERR(ep
->re_attr
.send_cq
)) {
437 rc
= PTR_ERR(ep
->re_attr
.send_cq
);
441 ep
->re_attr
.recv_cq
= ib_alloc_cq_any(device
, r_xprt
,
442 ep
->re_attr
.cap
.max_recv_wr
,
444 if (IS_ERR(ep
->re_attr
.recv_cq
)) {
445 rc
= PTR_ERR(ep
->re_attr
.recv_cq
);
448 ep
->re_receive_count
= 0;
450 /* Initialize cma parameters */
451 memset(&ep
->re_remote_cma
, 0, sizeof(ep
->re_remote_cma
));
453 /* Prepare RDMA-CM private message */
454 pmsg
= &ep
->re_cm_private
;
455 pmsg
->cp_magic
= rpcrdma_cmp_magic
;
456 pmsg
->cp_version
= RPCRDMA_CMP_VERSION
;
457 pmsg
->cp_flags
|= RPCRDMA_CMP_F_SND_W_INV_OK
;
458 pmsg
->cp_send_size
= rpcrdma_encode_buffer_size(ep
->re_inline_send
);
459 pmsg
->cp_recv_size
= rpcrdma_encode_buffer_size(ep
->re_inline_recv
);
460 ep
->re_remote_cma
.private_data
= pmsg
;
461 ep
->re_remote_cma
.private_data_len
= sizeof(*pmsg
);
463 /* Client offers RDMA Read but does not initiate */
464 ep
->re_remote_cma
.initiator_depth
= 0;
465 ep
->re_remote_cma
.responder_resources
=
466 min_t(int, U8_MAX
, device
->attrs
.max_qp_rd_atom
);
468 /* Limit transport retries so client can detect server
469 * GID changes quickly. RPC layer handles re-establishing
470 * transport connection and retransmission.
472 ep
->re_remote_cma
.retry_count
= 6;
474 /* RPC-over-RDMA handles its own flow control. In addition,
475 * make all RNR NAKs visible so we know that RPC-over-RDMA
476 * flow control is working correctly (no NAKs should be seen).
478 ep
->re_remote_cma
.flow_control
= 0;
479 ep
->re_remote_cma
.rnr_retry_count
= 0;
481 ep
->re_pd
= ib_alloc_pd(device
, 0);
482 if (IS_ERR(ep
->re_pd
)) {
483 rc
= PTR_ERR(ep
->re_pd
);
487 rc
= rdma_create_qp(id
, ep
->re_pd
, &ep
->re_attr
);
495 rpcrdma_ep_destroy(ep
);
499 r_xprt
->rx_ep
= NULL
;
504 * rpcrdma_xprt_connect - Connect an unconnected transport
505 * @r_xprt: controlling transport instance
507 * Returns 0 on success or a negative errno.
509 int rpcrdma_xprt_connect(struct rpcrdma_xprt
*r_xprt
)
511 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
512 struct rpcrdma_ep
*ep
;
516 rpcrdma_xprt_disconnect(r_xprt
);
517 rc
= rpcrdma_ep_create(r_xprt
);
522 ep
->re_connect_status
= 0;
523 xprt_clear_connected(xprt
);
525 rpcrdma_reset_cwnd(r_xprt
);
526 rpcrdma_post_recvs(r_xprt
, true);
528 rc
= rpcrdma_sendctxs_create(r_xprt
);
532 rc
= rdma_connect(ep
->re_id
, &ep
->re_remote_cma
);
536 if (xprt
->reestablish_timeout
< RPCRDMA_INIT_REEST_TO
)
537 xprt
->reestablish_timeout
= RPCRDMA_INIT_REEST_TO
;
538 wait_event_interruptible(ep
->re_connect_wait
,
539 ep
->re_connect_status
!= 0);
540 if (ep
->re_connect_status
<= 0) {
541 if (ep
->re_connect_status
== -EAGAIN
)
543 rc
= ep
->re_connect_status
;
547 rc
= rpcrdma_reqs_setup(r_xprt
);
549 rpcrdma_xprt_disconnect(r_xprt
);
552 rpcrdma_mrs_create(r_xprt
);
556 ep
->re_connect_status
= rc
;
557 trace_xprtrdma_connect(r_xprt
, rc
);
562 * rpcrdma_xprt_disconnect - Disconnect underlying transport
563 * @r_xprt: controlling transport instance
565 * Caller serializes. Either the transport send lock is held,
566 * or we're being called to destroy the transport.
568 * On return, @r_xprt is completely divested of all hardware
569 * resources and prepared for the next ->connect operation.
571 void rpcrdma_xprt_disconnect(struct rpcrdma_xprt
*r_xprt
)
573 struct rpcrdma_ep
*ep
= r_xprt
->rx_ep
;
574 struct rdma_cm_id
*id
;
581 rc
= rdma_disconnect(id
);
582 trace_xprtrdma_disconnect(r_xprt
, rc
);
584 rpcrdma_xprt_drain(r_xprt
);
585 rpcrdma_reps_unmap(r_xprt
);
586 rpcrdma_reqs_reset(r_xprt
);
587 rpcrdma_mrs_destroy(r_xprt
);
588 rpcrdma_sendctxs_destroy(r_xprt
);
590 if (rpcrdma_ep_destroy(ep
))
593 r_xprt
->rx_ep
= NULL
;
596 /* Fixed-size circular FIFO queue. This implementation is wait-free and
599 * Consumer is the code path that posts Sends. This path dequeues a
600 * sendctx for use by a Send operation. Multiple consumer threads
601 * are serialized by the RPC transport lock, which allows only one
602 * ->send_request call at a time.
604 * Producer is the code path that handles Send completions. This path
605 * enqueues a sendctx that has been completed. Multiple producer
606 * threads are serialized by the ib_poll_cq() function.
609 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
610 * queue activity, and rpcrdma_xprt_drain has flushed all remaining
613 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt
*r_xprt
)
615 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
618 if (!buf
->rb_sc_ctxs
)
620 for (i
= 0; i
<= buf
->rb_sc_last
; i
++)
621 kfree(buf
->rb_sc_ctxs
[i
]);
622 kfree(buf
->rb_sc_ctxs
);
623 buf
->rb_sc_ctxs
= NULL
;
626 static struct rpcrdma_sendctx
*rpcrdma_sendctx_create(struct rpcrdma_ep
*ep
)
628 struct rpcrdma_sendctx
*sc
;
630 sc
= kzalloc(struct_size(sc
, sc_sges
, ep
->re_attr
.cap
.max_send_sge
),
635 sc
->sc_cqe
.done
= rpcrdma_wc_send
;
639 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt
*r_xprt
)
641 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
642 struct rpcrdma_sendctx
*sc
;
645 /* Maximum number of concurrent outstanding Send WRs. Capping
646 * the circular queue size stops Send Queue overflow by causing
647 * the ->send_request call to fail temporarily before too many
650 i
= r_xprt
->rx_ep
->re_max_requests
+ RPCRDMA_MAX_BC_REQUESTS
;
651 buf
->rb_sc_ctxs
= kcalloc(i
, sizeof(sc
), GFP_KERNEL
);
652 if (!buf
->rb_sc_ctxs
)
655 buf
->rb_sc_last
= i
- 1;
656 for (i
= 0; i
<= buf
->rb_sc_last
; i
++) {
657 sc
= rpcrdma_sendctx_create(r_xprt
->rx_ep
);
661 buf
->rb_sc_ctxs
[i
] = sc
;
669 /* The sendctx queue is not guaranteed to have a size that is a
670 * power of two, thus the helpers in circ_buf.h cannot be used.
671 * The other option is to use modulus (%), which can be expensive.
673 static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer
*buf
,
676 return likely(item
< buf
->rb_sc_last
) ? item
+ 1 : 0;
680 * rpcrdma_sendctx_get_locked - Acquire a send context
681 * @r_xprt: controlling transport instance
683 * Returns pointer to a free send completion context; or NULL if
684 * the queue is empty.
686 * Usage: Called to acquire an SGE array before preparing a Send WR.
688 * The caller serializes calls to this function (per transport), and
689 * provides an effective memory barrier that flushes the new value
692 struct rpcrdma_sendctx
*rpcrdma_sendctx_get_locked(struct rpcrdma_xprt
*r_xprt
)
694 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
695 struct rpcrdma_sendctx
*sc
;
696 unsigned long next_head
;
698 next_head
= rpcrdma_sendctx_next(buf
, buf
->rb_sc_head
);
700 if (next_head
== READ_ONCE(buf
->rb_sc_tail
))
703 /* ORDER: item must be accessed _before_ head is updated */
704 sc
= buf
->rb_sc_ctxs
[next_head
];
706 /* Releasing the lock in the caller acts as a memory
707 * barrier that flushes rb_sc_head.
709 buf
->rb_sc_head
= next_head
;
714 /* The queue is "empty" if there have not been enough Send
715 * completions recently. This is a sign the Send Queue is
716 * backing up. Cause the caller to pause and try again.
718 xprt_wait_for_buffer_space(&r_xprt
->rx_xprt
);
719 r_xprt
->rx_stats
.empty_sendctx_q
++;
724 * rpcrdma_sendctx_put_locked - Release a send context
725 * @r_xprt: controlling transport instance
726 * @sc: send context to release
728 * Usage: Called from Send completion to return a sendctxt
731 * The caller serializes calls to this function (per transport).
733 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt
*r_xprt
,
734 struct rpcrdma_sendctx
*sc
)
736 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
737 unsigned long next_tail
;
739 /* Unmap SGEs of previously completed but unsignaled
740 * Sends by walking up the queue until @sc is found.
742 next_tail
= buf
->rb_sc_tail
;
744 next_tail
= rpcrdma_sendctx_next(buf
, next_tail
);
746 /* ORDER: item must be accessed _before_ tail is updated */
747 rpcrdma_sendctx_unmap(buf
->rb_sc_ctxs
[next_tail
]);
749 } while (buf
->rb_sc_ctxs
[next_tail
] != sc
);
751 /* Paired with READ_ONCE */
752 smp_store_release(&buf
->rb_sc_tail
, next_tail
);
754 xprt_write_space(&r_xprt
->rx_xprt
);
758 rpcrdma_mrs_create(struct rpcrdma_xprt
*r_xprt
)
760 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
761 struct rpcrdma_ep
*ep
= r_xprt
->rx_ep
;
764 for (count
= 0; count
< ep
->re_max_rdma_segs
; count
++) {
765 struct rpcrdma_mr
*mr
;
768 mr
= kzalloc(sizeof(*mr
), GFP_NOFS
);
772 rc
= frwr_mr_init(r_xprt
, mr
);
778 spin_lock(&buf
->rb_lock
);
779 rpcrdma_mr_push(mr
, &buf
->rb_mrs
);
780 list_add(&mr
->mr_all
, &buf
->rb_all_mrs
);
781 spin_unlock(&buf
->rb_lock
);
784 r_xprt
->rx_stats
.mrs_allocated
+= count
;
785 trace_xprtrdma_createmrs(r_xprt
, count
);
789 rpcrdma_mr_refresh_worker(struct work_struct
*work
)
791 struct rpcrdma_buffer
*buf
= container_of(work
, struct rpcrdma_buffer
,
793 struct rpcrdma_xprt
*r_xprt
= container_of(buf
, struct rpcrdma_xprt
,
796 rpcrdma_mrs_create(r_xprt
);
797 xprt_write_space(&r_xprt
->rx_xprt
);
801 * rpcrdma_mrs_refresh - Wake the MR refresh worker
802 * @r_xprt: controlling transport instance
805 void rpcrdma_mrs_refresh(struct rpcrdma_xprt
*r_xprt
)
807 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
808 struct rpcrdma_ep
*ep
= r_xprt
->rx_ep
;
810 /* If there is no underlying connection, it's no use
811 * to wake the refresh worker.
813 if (ep
->re_connect_status
== 1) {
814 /* The work is scheduled on a WQ_MEM_RECLAIM
815 * workqueue in order to prevent MR allocation
816 * from recursing into NFS during direct reclaim.
818 queue_work(xprtiod_workqueue
, &buf
->rb_refresh_worker
);
823 * rpcrdma_req_create - Allocate an rpcrdma_req object
824 * @r_xprt: controlling r_xprt
825 * @size: initial size, in bytes, of send and receive buffers
826 * @flags: GFP flags passed to memory allocators
828 * Returns an allocated and fully initialized rpcrdma_req or NULL.
830 struct rpcrdma_req
*rpcrdma_req_create(struct rpcrdma_xprt
*r_xprt
, size_t size
,
833 struct rpcrdma_buffer
*buffer
= &r_xprt
->rx_buf
;
834 struct rpcrdma_req
*req
;
836 req
= kzalloc(sizeof(*req
), flags
);
840 req
->rl_sendbuf
= rpcrdma_regbuf_alloc(size
, DMA_TO_DEVICE
, flags
);
841 if (!req
->rl_sendbuf
)
844 req
->rl_recvbuf
= rpcrdma_regbuf_alloc(size
, DMA_NONE
, flags
);
845 if (!req
->rl_recvbuf
)
848 INIT_LIST_HEAD(&req
->rl_free_mrs
);
849 INIT_LIST_HEAD(&req
->rl_registered
);
850 spin_lock(&buffer
->rb_lock
);
851 list_add(&req
->rl_all
, &buffer
->rb_allreqs
);
852 spin_unlock(&buffer
->rb_lock
);
856 kfree(req
->rl_sendbuf
);
864 * rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object
865 * @r_xprt: controlling transport instance
866 * @req: rpcrdma_req object to set up
868 * Returns zero on success, and a negative errno on failure.
870 int rpcrdma_req_setup(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_req
*req
)
872 struct rpcrdma_regbuf
*rb
;
875 /* Compute maximum header buffer size in bytes */
876 maxhdrsize
= rpcrdma_fixed_maxsz
+ 3 +
877 r_xprt
->rx_ep
->re_max_rdma_segs
* rpcrdma_readchunk_maxsz
;
878 maxhdrsize
*= sizeof(__be32
);
879 rb
= rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize
),
880 DMA_TO_DEVICE
, GFP_KERNEL
);
884 if (!__rpcrdma_regbuf_dma_map(r_xprt
, rb
))
887 req
->rl_rdmabuf
= rb
;
888 xdr_buf_init(&req
->rl_hdrbuf
, rdmab_data(rb
), rdmab_length(rb
));
892 rpcrdma_regbuf_free(rb
);
897 /* ASSUMPTION: the rb_allreqs list is stable for the duration,
898 * and thus can be walked without holding rb_lock. Eg. the
899 * caller is holding the transport send lock to exclude
900 * device removal or disconnection.
902 static int rpcrdma_reqs_setup(struct rpcrdma_xprt
*r_xprt
)
904 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
905 struct rpcrdma_req
*req
;
908 list_for_each_entry(req
, &buf
->rb_allreqs
, rl_all
) {
909 rc
= rpcrdma_req_setup(r_xprt
, req
);
916 static void rpcrdma_req_reset(struct rpcrdma_req
*req
)
918 /* Credits are valid for only one connection */
919 req
->rl_slot
.rq_cong
= 0;
921 rpcrdma_regbuf_free(req
->rl_rdmabuf
);
922 req
->rl_rdmabuf
= NULL
;
924 rpcrdma_regbuf_dma_unmap(req
->rl_sendbuf
);
925 rpcrdma_regbuf_dma_unmap(req
->rl_recvbuf
);
928 /* ASSUMPTION: the rb_allreqs list is stable for the duration,
929 * and thus can be walked without holding rb_lock. Eg. the
930 * caller is holding the transport send lock to exclude
931 * device removal or disconnection.
933 static void rpcrdma_reqs_reset(struct rpcrdma_xprt
*r_xprt
)
935 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
936 struct rpcrdma_req
*req
;
938 list_for_each_entry(req
, &buf
->rb_allreqs
, rl_all
)
939 rpcrdma_req_reset(req
);
942 /* No locking needed here. This function is called only by the
943 * Receive completion handler.
946 struct rpcrdma_rep
*rpcrdma_rep_create(struct rpcrdma_xprt
*r_xprt
,
949 struct rpcrdma_rep
*rep
;
951 rep
= kzalloc(sizeof(*rep
), GFP_KERNEL
);
955 rep
->rr_rdmabuf
= rpcrdma_regbuf_alloc(r_xprt
->rx_ep
->re_inline_recv
,
956 DMA_FROM_DEVICE
, GFP_KERNEL
);
957 if (!rep
->rr_rdmabuf
)
960 if (!rpcrdma_regbuf_dma_map(r_xprt
, rep
->rr_rdmabuf
))
961 goto out_free_regbuf
;
963 xdr_buf_init(&rep
->rr_hdrbuf
, rdmab_data(rep
->rr_rdmabuf
),
964 rdmab_length(rep
->rr_rdmabuf
));
965 rep
->rr_cqe
.done
= rpcrdma_wc_receive
;
966 rep
->rr_rxprt
= r_xprt
;
967 rep
->rr_recv_wr
.next
= NULL
;
968 rep
->rr_recv_wr
.wr_cqe
= &rep
->rr_cqe
;
969 rep
->rr_recv_wr
.sg_list
= &rep
->rr_rdmabuf
->rg_iov
;
970 rep
->rr_recv_wr
.num_sge
= 1;
972 list_add(&rep
->rr_all
, &r_xprt
->rx_buf
.rb_all_reps
);
976 rpcrdma_regbuf_free(rep
->rr_rdmabuf
);
983 /* No locking needed here. This function is invoked only by the
984 * Receive completion handler, or during transport shutdown.
986 static void rpcrdma_rep_destroy(struct rpcrdma_rep
*rep
)
988 list_del(&rep
->rr_all
);
989 rpcrdma_regbuf_free(rep
->rr_rdmabuf
);
993 static struct rpcrdma_rep
*rpcrdma_rep_get_locked(struct rpcrdma_buffer
*buf
)
995 struct llist_node
*node
;
997 /* Calls to llist_del_first are required to be serialized */
998 node
= llist_del_first(&buf
->rb_free_reps
);
1001 return llist_entry(node
, struct rpcrdma_rep
, rr_node
);
1004 static void rpcrdma_rep_put(struct rpcrdma_buffer
*buf
,
1005 struct rpcrdma_rep
*rep
)
1007 llist_add(&rep
->rr_node
, &buf
->rb_free_reps
);
1010 static void rpcrdma_reps_unmap(struct rpcrdma_xprt
*r_xprt
)
1012 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1013 struct rpcrdma_rep
*rep
;
1015 list_for_each_entry(rep
, &buf
->rb_all_reps
, rr_all
) {
1016 rpcrdma_regbuf_dma_unmap(rep
->rr_rdmabuf
);
1017 rep
->rr_temp
= true;
1021 static void rpcrdma_reps_destroy(struct rpcrdma_buffer
*buf
)
1023 struct rpcrdma_rep
*rep
;
1025 while ((rep
= rpcrdma_rep_get_locked(buf
)) != NULL
)
1026 rpcrdma_rep_destroy(rep
);
1030 * rpcrdma_buffer_create - Create initial set of req/rep objects
1031 * @r_xprt: transport instance to (re)initialize
1033 * Returns zero on success, otherwise a negative errno.
1035 int rpcrdma_buffer_create(struct rpcrdma_xprt
*r_xprt
)
1037 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1040 buf
->rb_bc_srv_max_requests
= 0;
1041 spin_lock_init(&buf
->rb_lock
);
1042 INIT_LIST_HEAD(&buf
->rb_mrs
);
1043 INIT_LIST_HEAD(&buf
->rb_all_mrs
);
1044 INIT_WORK(&buf
->rb_refresh_worker
, rpcrdma_mr_refresh_worker
);
1046 INIT_LIST_HEAD(&buf
->rb_send_bufs
);
1047 INIT_LIST_HEAD(&buf
->rb_allreqs
);
1048 INIT_LIST_HEAD(&buf
->rb_all_reps
);
1051 for (i
= 0; i
< r_xprt
->rx_xprt
.max_reqs
; i
++) {
1052 struct rpcrdma_req
*req
;
1054 req
= rpcrdma_req_create(r_xprt
, RPCRDMA_V1_DEF_INLINE_SIZE
* 2,
1058 list_add(&req
->rl_list
, &buf
->rb_send_bufs
);
1061 init_llist_head(&buf
->rb_free_reps
);
1065 rpcrdma_buffer_destroy(buf
);
1070 * rpcrdma_req_destroy - Destroy an rpcrdma_req object
1071 * @req: unused object to be destroyed
1073 * Relies on caller holding the transport send lock to protect
1074 * removing req->rl_all from buf->rb_all_reqs safely.
1076 void rpcrdma_req_destroy(struct rpcrdma_req
*req
)
1078 struct rpcrdma_mr
*mr
;
1080 list_del(&req
->rl_all
);
1082 while ((mr
= rpcrdma_mr_pop(&req
->rl_free_mrs
))) {
1083 struct rpcrdma_buffer
*buf
= &mr
->mr_xprt
->rx_buf
;
1085 spin_lock(&buf
->rb_lock
);
1086 list_del(&mr
->mr_all
);
1087 spin_unlock(&buf
->rb_lock
);
1089 frwr_release_mr(mr
);
1092 rpcrdma_regbuf_free(req
->rl_recvbuf
);
1093 rpcrdma_regbuf_free(req
->rl_sendbuf
);
1094 rpcrdma_regbuf_free(req
->rl_rdmabuf
);
1099 * rpcrdma_mrs_destroy - Release all of a transport's MRs
1100 * @r_xprt: controlling transport instance
1102 * Relies on caller holding the transport send lock to protect
1103 * removing mr->mr_list from req->rl_free_mrs safely.
1105 static void rpcrdma_mrs_destroy(struct rpcrdma_xprt
*r_xprt
)
1107 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1108 struct rpcrdma_mr
*mr
;
1110 cancel_work_sync(&buf
->rb_refresh_worker
);
1112 spin_lock(&buf
->rb_lock
);
1113 while ((mr
= list_first_entry_or_null(&buf
->rb_all_mrs
,
1116 list_del(&mr
->mr_list
);
1117 list_del(&mr
->mr_all
);
1118 spin_unlock(&buf
->rb_lock
);
1120 frwr_release_mr(mr
);
1122 spin_lock(&buf
->rb_lock
);
1124 spin_unlock(&buf
->rb_lock
);
1128 * rpcrdma_buffer_destroy - Release all hw resources
1129 * @buf: root control block for resources
1131 * ORDERING: relies on a prior rpcrdma_xprt_drain :
1132 * - No more Send or Receive completions can occur
1133 * - All MRs, reps, and reqs are returned to their free lists
1136 rpcrdma_buffer_destroy(struct rpcrdma_buffer
*buf
)
1138 rpcrdma_reps_destroy(buf
);
1140 while (!list_empty(&buf
->rb_send_bufs
)) {
1141 struct rpcrdma_req
*req
;
1143 req
= list_first_entry(&buf
->rb_send_bufs
,
1144 struct rpcrdma_req
, rl_list
);
1145 list_del(&req
->rl_list
);
1146 rpcrdma_req_destroy(req
);
1151 * rpcrdma_mr_get - Allocate an rpcrdma_mr object
1152 * @r_xprt: controlling transport
1154 * Returns an initialized rpcrdma_mr or NULL if no free
1155 * rpcrdma_mr objects are available.
1158 rpcrdma_mr_get(struct rpcrdma_xprt
*r_xprt
)
1160 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1161 struct rpcrdma_mr
*mr
;
1163 spin_lock(&buf
->rb_lock
);
1164 mr
= rpcrdma_mr_pop(&buf
->rb_mrs
);
1165 spin_unlock(&buf
->rb_lock
);
1170 * rpcrdma_mr_put - DMA unmap an MR and release it
1171 * @mr: MR to release
1174 void rpcrdma_mr_put(struct rpcrdma_mr
*mr
)
1176 struct rpcrdma_xprt
*r_xprt
= mr
->mr_xprt
;
1178 if (mr
->mr_dir
!= DMA_NONE
) {
1179 trace_xprtrdma_mr_unmap(mr
);
1180 ib_dma_unmap_sg(r_xprt
->rx_ep
->re_id
->device
,
1181 mr
->mr_sg
, mr
->mr_nents
, mr
->mr_dir
);
1182 mr
->mr_dir
= DMA_NONE
;
1185 rpcrdma_mr_push(mr
, &mr
->mr_req
->rl_free_mrs
);
1189 * rpcrdma_buffer_get - Get a request buffer
1190 * @buffers: Buffer pool from which to obtain a buffer
1192 * Returns a fresh rpcrdma_req, or NULL if none are available.
1194 struct rpcrdma_req
*
1195 rpcrdma_buffer_get(struct rpcrdma_buffer
*buffers
)
1197 struct rpcrdma_req
*req
;
1199 spin_lock(&buffers
->rb_lock
);
1200 req
= list_first_entry_or_null(&buffers
->rb_send_bufs
,
1201 struct rpcrdma_req
, rl_list
);
1203 list_del_init(&req
->rl_list
);
1204 spin_unlock(&buffers
->rb_lock
);
1209 * rpcrdma_buffer_put - Put request/reply buffers back into pool
1210 * @buffers: buffer pool
1211 * @req: object to return
1214 void rpcrdma_buffer_put(struct rpcrdma_buffer
*buffers
, struct rpcrdma_req
*req
)
1217 rpcrdma_rep_put(buffers
, req
->rl_reply
);
1218 req
->rl_reply
= NULL
;
1220 spin_lock(&buffers
->rb_lock
);
1221 list_add(&req
->rl_list
, &buffers
->rb_send_bufs
);
1222 spin_unlock(&buffers
->rb_lock
);
1226 * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
1227 * @rep: rep to release
1229 * Used after error conditions.
1231 void rpcrdma_recv_buffer_put(struct rpcrdma_rep
*rep
)
1233 rpcrdma_rep_put(&rep
->rr_rxprt
->rx_buf
, rep
);
1236 /* Returns a pointer to a rpcrdma_regbuf object, or NULL.
1238 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1239 * receiving the payload of RDMA RECV operations. During Long Calls
1240 * or Replies they may be registered externally via frwr_map.
1242 static struct rpcrdma_regbuf
*
1243 rpcrdma_regbuf_alloc(size_t size
, enum dma_data_direction direction
,
1246 struct rpcrdma_regbuf
*rb
;
1248 rb
= kmalloc(sizeof(*rb
), flags
);
1251 rb
->rg_data
= kmalloc(size
, flags
);
1257 rb
->rg_device
= NULL
;
1258 rb
->rg_direction
= direction
;
1259 rb
->rg_iov
.length
= size
;
1264 * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
1265 * @rb: regbuf to reallocate
1266 * @size: size of buffer to be allocated, in bytes
1269 * Returns true if reallocation was successful. If false is
1270 * returned, @rb is left untouched.
1272 bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf
*rb
, size_t size
, gfp_t flags
)
1276 buf
= kmalloc(size
, flags
);
1280 rpcrdma_regbuf_dma_unmap(rb
);
1284 rb
->rg_iov
.length
= size
;
1289 * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
1290 * @r_xprt: controlling transport instance
1291 * @rb: regbuf to be mapped
1293 * Returns true if the buffer is now DMA mapped to @r_xprt's device
1295 bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt
*r_xprt
,
1296 struct rpcrdma_regbuf
*rb
)
1298 struct ib_device
*device
= r_xprt
->rx_ep
->re_id
->device
;
1300 if (rb
->rg_direction
== DMA_NONE
)
1303 rb
->rg_iov
.addr
= ib_dma_map_single(device
, rdmab_data(rb
),
1304 rdmab_length(rb
), rb
->rg_direction
);
1305 if (ib_dma_mapping_error(device
, rdmab_addr(rb
))) {
1306 trace_xprtrdma_dma_maperr(rdmab_addr(rb
));
1310 rb
->rg_device
= device
;
1311 rb
->rg_iov
.lkey
= r_xprt
->rx_ep
->re_pd
->local_dma_lkey
;
1315 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf
*rb
)
1320 if (!rpcrdma_regbuf_is_mapped(rb
))
1323 ib_dma_unmap_single(rb
->rg_device
, rdmab_addr(rb
), rdmab_length(rb
),
1325 rb
->rg_device
= NULL
;
1328 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf
*rb
)
1330 rpcrdma_regbuf_dma_unmap(rb
);
1337 * rpcrdma_post_sends - Post WRs to a transport's Send Queue
1338 * @r_xprt: controlling transport instance
1339 * @req: rpcrdma_req containing the Send WR to post
1341 * Returns 0 if the post was successful, otherwise -ENOTCONN
1344 int rpcrdma_post_sends(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_req
*req
)
1346 struct ib_send_wr
*send_wr
= &req
->rl_wr
;
1347 struct rpcrdma_ep
*ep
= r_xprt
->rx_ep
;
1350 if (!ep
->re_send_count
|| kref_read(&req
->rl_kref
) > 1) {
1351 send_wr
->send_flags
|= IB_SEND_SIGNALED
;
1352 ep
->re_send_count
= ep
->re_send_batch
;
1354 send_wr
->send_flags
&= ~IB_SEND_SIGNALED
;
1355 --ep
->re_send_count
;
1358 rc
= frwr_send(r_xprt
, req
);
1359 trace_xprtrdma_post_send(req
, rc
);
1366 * rpcrdma_post_recvs - Refill the Receive Queue
1367 * @r_xprt: controlling transport instance
1368 * @temp: mark Receive buffers to be deleted after use
1371 void rpcrdma_post_recvs(struct rpcrdma_xprt
*r_xprt
, bool temp
)
1373 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1374 struct rpcrdma_ep
*ep
= r_xprt
->rx_ep
;
1375 struct ib_recv_wr
*wr
, *bad_wr
;
1376 struct rpcrdma_rep
*rep
;
1377 int needed
, count
, rc
;
1382 needed
= buf
->rb_credits
+ (buf
->rb_bc_srv_max_requests
<< 1);
1383 if (likely(ep
->re_receive_count
> needed
))
1385 needed
-= ep
->re_receive_count
;
1387 needed
+= RPCRDMA_MAX_RECV_BATCH
;
1389 /* fast path: all needed reps can be found on the free list */
1392 rep
= rpcrdma_rep_get_locked(buf
);
1393 if (rep
&& rep
->rr_temp
) {
1394 rpcrdma_rep_destroy(rep
);
1398 rep
= rpcrdma_rep_create(r_xprt
, temp
);
1402 trace_xprtrdma_post_recv(rep
);
1403 rep
->rr_recv_wr
.next
= wr
;
1404 wr
= &rep
->rr_recv_wr
;
1411 rc
= ib_post_recv(ep
->re_id
->qp
, wr
,
1412 (const struct ib_recv_wr
**)&bad_wr
);
1414 trace_xprtrdma_post_recvs(r_xprt
, count
, rc
);
1416 for (wr
= bad_wr
; wr
;) {
1417 struct rpcrdma_rep
*rep
;
1419 rep
= container_of(wr
, struct rpcrdma_rep
, rr_recv_wr
);
1421 rpcrdma_recv_buffer_put(rep
);
1425 ep
->re_receive_count
+= count
;