rpcrdma_rep_put(&r_xprt->rx_buf, rep);
req->rl_reply = NULL;
- spin_lock(&xprt->bc_pa_lock);
- list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
- spin_unlock(&xprt->bc_pa_lock);
+ rpcrdma_req_put(req);
xprt_put(xprt);
}
rqst->rq_xprt = xprt;
__set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
xdr_buf_init(&rqst->rq_snd_buf, rdmab_data(req->rl_sendbuf), size);
+ kref_init(&req->rl_kref);
return rqst;
}
return 0;
}
-static void rpcrdma_sendctx_done(struct kref *kref)
-{
- struct rpcrdma_req *req =
- container_of(kref, struct rpcrdma_req, rl_kref);
- struct rpcrdma_rep *rep = req->rl_reply;
-
- rpcrdma_complete_rqst(rep);
- rep->rr_rxprt->rx_stats.reply_waits_for_send++;
-}
-
static void rpcrdma_sendctx_dma_unmap(struct rpcrdma_sendctx *sc)
{
struct rpcrdma_regbuf *rb = sc->sc_req->rl_sendbuf;
}
/**
- * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
+ * rpcrdma_sendctx_unmap - DMA-unmap Send buffer and release Send owner
* @sc: sendctx containing SGEs to unmap
*
*/
void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
{
- if (!sc->sc_unmap_count)
- return;
+ struct rpcrdma_req *req = sc->sc_req;
rpcrdma_sendctx_dma_unmap(sc);
- kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
+ sc->sc_req = NULL;
+ rpcrdma_req_put(req);
+}
+
+/* No Send was posted. Release DMA mappings prepared for this
+ * sendctx, but leave the request reference count alone.
+ */
+static void rpcrdma_sendctx_cancel(struct rpcrdma_sendctx *sc)
+{
+ rpcrdma_sendctx_dma_unmap(sc);
+ sc->sc_req = NULL;
}
/* Prepare an SGE for the RPC-over-RDMA transport header.
tail->iov_len))
return false;
- if (req->rl_sendctx->sc_unmap_count)
- kref_get(&req->rl_kref);
return true;
}
len -= len & 3;
if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len))
return false;
- kref_get(&req->rl_kref);
}
return true;
goto out_nosc;
req->rl_sendctx->sc_unmap_count = 0;
req->rl_sendctx->sc_req = req;
- kref_init(&req->rl_kref);
req->rl_wr.wr_cqe = &req->rl_sendctx->sc_cqe;
req->rl_wr.sg_list = req->rl_sendctx->sc_sges;
req->rl_wr.num_sge = 0;
goto out_unmap;
}
+ /* The Send-side owner releases this reference when the
+ * Send has completed.
+ */
+ kref_get(&req->rl_kref);
return 0;
out_unmap:
- rpcrdma_sendctx_unmap(req->rl_sendctx);
+ rpcrdma_sendctx_cancel(req->rl_sendctx);
out_nosc:
trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
return ret;
goto out;
}
-static void rpcrdma_reply_done(struct kref *kref)
-{
- struct rpcrdma_req *req =
- container_of(kref, struct rpcrdma_req, rl_kref);
-
- rpcrdma_complete_rqst(req->rl_reply);
-}
-
/**
* rpcrdma_reply_handler - Process received RPC/RDMA messages
* @rep: Incoming rpcrdma_rep object to process
frwr_unmap_async(r_xprt, req);
/* LocalInv completion will complete the RPC */
else
- kref_put(&req->rl_kref, rpcrdma_reply_done);
+ rpcrdma_complete_rqst(rep);
out_post:
rpcrdma_post_recvs(r_xprt,
cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
rpcrdma_xprt_disconnect(r_xprt);
+
+ /* The disconnect's sendctx drain can return bc_prealloc reqs
+ * to bc_pa_list after xprt_destroy_backchannel() emptied it.
+ */
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ xprt_rdma_bc_destroy(xprt, 0);
+#endif
rpcrdma_buffer_destroy(&r_xprt->rx_buf);
xprt_rdma_free_addresses(xprt);
delay);
}
+/* rl_kref has two owners while a Send is outstanding: the rpc_rqst
+ * owner and the sendctx. Replies complete the RPC but do not drop
+ * either reference. The req returns to its free pool only after
+ * xprt_rdma_free_slot() or xprt_rdma_bc_free_rqst() has dropped the
+ * RPC-layer reference and rpcrdma_sendctx_unmap() has dropped the
+ * Send-side reference.
+ */
+static void rpcrdma_req_release(struct kref *kref)
+{
+ struct rpcrdma_req *req =
+ container_of(kref, struct rpcrdma_req, rl_kref);
+ struct rpc_rqst *rqst = &req->rl_slot;
+ struct rpc_xprt *xprt = rqst->rq_xprt;
+ struct rpcrdma_xprt *r_xprt;
+
+ kref_init(&req->rl_kref);
+
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ if (bc_prealloc(rqst)) {
+ spin_lock(&xprt->bc_pa_lock);
+ list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
+ spin_unlock(&xprt->bc_pa_lock);
+ return;
+ }
+#endif
+
+ if (xprt_wake_up_backlog(xprt, rqst))
+ return;
+
+ r_xprt = rpcx_to_rdmax(xprt);
+ memset(rqst, 0, sizeof(*rqst));
+ rpcrdma_buffer_put(&r_xprt->rx_buf, req);
+}
+
+void rpcrdma_req_put(struct rpcrdma_req *req)
+{
+ kref_put(&req->rl_kref, rpcrdma_req_release);
+}
+
/**
* xprt_rdma_alloc_slot - allocate an rpc_rqst
* @xprt: controlling RPC transport
req = rpcrdma_buffer_get(&r_xprt->rx_buf);
if (!req)
goto out_sleep;
+ kref_init(&req->rl_kref);
task->tk_rqstp = &req->rl_slot;
task->tk_status = 0;
return;
if (req) {
struct rpc_rqst *rqst = &req->rl_slot;
+ kref_init(&req->rl_kref);
if (!xprt_wake_up_backlog(xprt, rqst)) {
memset(rqst, 0, sizeof(*rqst));
rpcrdma_buffer_put(&r_xprt->rx_buf, req);
container_of(xprt, struct rpcrdma_xprt, rx_xprt);
rpcrdma_reply_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
- if (!xprt_wake_up_backlog(xprt, rqst)) {
- memset(rqst, 0, sizeof(*rqst));
- rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
- }
+ rpcrdma_req_put(rpcr_to_rdmar(rqst));
}
static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,
r_xprt->rx_stats.mrs_allocated,
r_xprt->rx_stats.local_inv_needed,
r_xprt->rx_stats.empty_sendctx_q,
- r_xprt->rx_stats.reply_waits_for_send);
+ 0LU); /* was reply_waits_for_send; column preserved */
}
static int
static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt);
+static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
+ unsigned long item);
static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_sendctx *sc);
static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt);
rpcrdma_xprt_drain(r_xprt);
rpcrdma_reps_unmap(r_xprt);
+ rpcrdma_sendctxs_destroy(r_xprt);
rpcrdma_reqs_reset(r_xprt);
rpcrdma_mrs_destroy(r_xprt);
- rpcrdma_sendctxs_destroy(r_xprt);
if (rpcrdma_ep_put(ep))
rdma_destroy_id(id);
if (!buf->rb_sc_ctxs)
return;
+
+ /* The QP is drained, but the final unsignaled Sends might not
+ * have been walked by a signaled Send completion. Release those
+ * Send owners before request buffers are reset.
+ */
+ for (i = rpcrdma_sendctx_next(buf, buf->rb_sc_tail);
+ i != rpcrdma_sendctx_next(buf, buf->rb_sc_head);
+ i = rpcrdma_sendctx_next(buf, i)) {
+ struct rpcrdma_sendctx *sc = buf->rb_sc_ctxs[i];
+
+ if (sc && sc->sc_req)
+ rpcrdma_sendctx_unmap(sc);
+ }
+
for (i = 0; i <= buf->rb_sc_last; i++)
kfree(buf->rb_sc_ctxs[i]);
kfree(buf->rb_sc_ctxs);
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
unsigned long next_tail;
- /* Unmap SGEs of previously completed but unsignaled
- * Sends by walking up the queue until @sc is found.
+ /* Release previously completed but unsignaled Sends by walking
+ * up the queue until @sc is found. Entries left behind by a
+ * failed rpcrdma_prepare_send_sges() have sc_req cleared.
*/
next_tail = buf->rb_sc_tail;
do {
+ struct rpcrdma_sendctx *cur;
+
next_tail = rpcrdma_sendctx_next(buf, next_tail);
/* ORDER: item must be accessed _before_ tail is updated */
- rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
+ cur = buf->rb_sc_ctxs[next_tail];
+ if (cur->sc_req)
+ rpcrdma_sendctx_unmap(cur);
} while (buf->rb_sc_ctxs[next_tail] != sc);
/* accessed when receiving a reply */
unsigned long long total_rdma_reply;
unsigned long long fixup_copy_count;
- unsigned long reply_waits_for_send;
unsigned long local_inv_needed;
unsigned long nomsg_call_count;
unsigned long bcall_count;
struct rpcrdma_req *req);
void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep);
void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req);
+void rpcrdma_req_put(struct rpcrdma_req *req);
bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size,
gfp_t flags);