]> git.ipfire.org Git - thirdparty/kernel/linux.git/commitdiff
xprtrdma: Decouple req recycling from RPC completion
authorChuck Lever <chuck.lever@oracle.com>
Tue, 26 May 2026 14:14:02 +0000 (10:14 -0400)
committerAnna Schumaker <anna.schumaker@hammerspace.com>
Mon, 8 Jun 2026 14:21:55 +0000 (10:21 -0400)
rl_kref formerly served two distinct lifetimes through a single
refcount: it gated when a Reply could wake its RPC task, and it
gated when an rpcrdma_req could return to its free pool. The
marshal path took the Send-side reference only when SGEs needed
DMA-unmap (sc_unmap_count > 0), which made a Send carrying only
pre-registered buffers an exception: the Reply handler dropped
rl_kref from 1 to 0 and freed the req while the HCA might still
be DMA-reading from its send buffer.

Give rl_kref a narrower job. The RPC layer takes one reference
when slot allocation hands a req out. rpcrdma_prepare_send_sges()
takes a Send-side reference unconditionally after WR preparation
succeeds. xprt_rdma_free_slot() and xprt_rdma_bc_free_rqst() drop
the RPC-layer reference; rpcrdma_sendctx_unmap() drops the
Send-side reference. The req returns to its free pool only after
both owners have signed off.

The existing kref_init(&req->rl_kref) call in
rpcrdma_prepare_send_sges() is removed. Initialization moves to
the slot-allocation paths (xprt_rdma_alloc_slot and
rpcrdma_bc_rqst_get), and the release callback re-arms rl_kref
before the req returns to a free pool. A re-init in the marshal
path would discard the RPC-layer reference that already exists
on entry.

Three invariants follow:

  - Any rpcrdma_req held by an rpc_rqst has rl_kref >= 1.
    xprt_rdma_alloc_slot(), rpcrdma_bc_rqst_get(), and the
    backlog-wake branch in xprt_rdma_alloc_slot() each kref_init
    rl_kref before publishing the req. Without this invariant,
    an RPC task that aborts between slot allocation and marshal
    (gss_refresh failure or signal during call_connect, for
    example) would drive xprt_release() ->
    xprt_rdma_free_slot() -> kref_put against a refcount of
    zero, saturating refcount_t and stranding the slot.

  - The Send-side reference is taken only after WR prep
    succeeds. A mapping failure in rpcrdma_prepare_send_sges()
    runs rpcrdma_sendctx_cancel(), which DMA-unmaps the sendctx
    and clears sc_req without touching rl_kref. The sendctx
    ring walks in rpcrdma_sendctx_put_locked() and
    rpcrdma_sendctxs_destroy() skip entries with sc_req == NULL,
    so a burst of -EIO marshal failures cannot hold reqs off
    rb_send_bufs.

  - The release callback re-arms rl_kref so the next consumer
    enters with the invariant satisfied.

Replies now complete the RPC directly. rpcrdma_reply_handler()
calls rpcrdma_complete_rqst() in place of kref_put on the
non-LocalInv branch. The LocalInv branch already completes the
RPC from frwr_unmap_async() and is unaffected.

Because Send-side references can now outlive RPC completion,
connection teardown drains sendctx entries whose unsignaled
Sends never had a later signaled completion to walk the ring.
rpcrdma_sendctxs_destroy() walks the active range and runs
rpcrdma_sendctx_unmap() on each entry with a non-NULL sc_req
before the request buffers are reset, and is moved ahead of
rpcrdma_reqs_reset() in rpcrdma_xprt_disconnect() so the reqs
are still in their pre-reset state when the Send-side refs are
released.

The drain creates a teardown-ordering hazard on the backchannel
path. With the new lifetime, releasing a bc_prealloc req from
rpcrdma_req_release() re-adds it to bc_pa_list. The disconnect
in xprt_rdma_destroy() runs after xprt_destroy_backchannel() has
already emptied bc_pa_list, so the drained reqs would otherwise
leak. xprt_rdma_destroy() now runs xprt_rdma_bc_destroy(xprt, 0)
a second time after the disconnect to reclaim them.

Fixes: 0ab115237025 ("xprtrdma: Wake RPCs directly in rpcrdma_wc_send path")
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <anna.schumaker@hammerspace.com>
net/sunrpc/xprtrdma/backchannel.c
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/transport.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index 2f0f9618dd0580f8ad72f64b4926b0c5be043bb7..e5b3463da25f9191119858a7f24f7d4b8d07a417 100644 (file)
@@ -159,9 +159,7 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
        rpcrdma_rep_put(&r_xprt->rx_buf, rep);
        req->rl_reply = NULL;
 
-       spin_lock(&xprt->bc_pa_lock);
-       list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
-       spin_unlock(&xprt->bc_pa_lock);
+       rpcrdma_req_put(req);
        xprt_put(xprt);
 }
 
@@ -203,6 +201,7 @@ create_req:
        rqst->rq_xprt = xprt;
        __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
        xdr_buf_init(&rqst->rq_snd_buf, rdmab_data(req->rl_sendbuf), size);
+       kref_init(&req->rl_kref);
        return rqst;
 }
 
index 16b9987858d683089cf3e3e3660ac0efb60e4a7d..69380f9dfa499dcf5ec09cc3afa5ae41819b563d 100644 (file)
@@ -467,16 +467,6 @@ static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
        return 0;
 }
 
-static void rpcrdma_sendctx_done(struct kref *kref)
-{
-       struct rpcrdma_req *req =
-               container_of(kref, struct rpcrdma_req, rl_kref);
-       struct rpcrdma_rep *rep = req->rl_reply;
-
-       rpcrdma_complete_rqst(rep);
-       rep->rr_rxprt->rx_stats.reply_waits_for_send++;
-}
-
 static void rpcrdma_sendctx_dma_unmap(struct rpcrdma_sendctx *sc)
 {
        struct rpcrdma_regbuf *rb = sc->sc_req->rl_sendbuf;
@@ -493,17 +483,26 @@ static void rpcrdma_sendctx_dma_unmap(struct rpcrdma_sendctx *sc)
 }
 
 /**
- * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
+ * rpcrdma_sendctx_unmap - DMA-unmap Send buffer and release Send owner
  * @sc: sendctx containing SGEs to unmap
  *
  */
 void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
 {
-       if (!sc->sc_unmap_count)
-               return;
+       struct rpcrdma_req *req = sc->sc_req;
 
        rpcrdma_sendctx_dma_unmap(sc);
-       kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
+       sc->sc_req = NULL;
+       rpcrdma_req_put(req);
+}
+
+/* No Send was posted. Release DMA mappings prepared for this
+ * sendctx, but leave the request reference count alone.
+ */
+static void rpcrdma_sendctx_cancel(struct rpcrdma_sendctx *sc)
+{
+       rpcrdma_sendctx_dma_unmap(sc);
+       sc->sc_req = NULL;
 }
 
 /* Prepare an SGE for the RPC-over-RDMA transport header.
@@ -695,8 +694,6 @@ static bool rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt *r_xprt,
                                              tail->iov_len))
                        return false;
 
-       if (req->rl_sendctx->sc_unmap_count)
-               kref_get(&req->rl_kref);
        return true;
 }
 
@@ -726,7 +723,6 @@ static bool rpcrdma_prepare_readch(struct rpcrdma_xprt *r_xprt,
                len -= len & 3;
                if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len))
                        return false;
-               kref_get(&req->rl_kref);
        }
 
        return true;
@@ -755,7 +751,6 @@ inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
                goto out_nosc;
        req->rl_sendctx->sc_unmap_count = 0;
        req->rl_sendctx->sc_req = req;
-       kref_init(&req->rl_kref);
        req->rl_wr.wr_cqe = &req->rl_sendctx->sc_cqe;
        req->rl_wr.sg_list = req->rl_sendctx->sc_sges;
        req->rl_wr.num_sge = 0;
@@ -783,10 +778,14 @@ inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
                goto out_unmap;
        }
 
+       /* The Send-side owner releases this reference when the
+        * Send has completed.
+        */
+       kref_get(&req->rl_kref);
        return 0;
 
 out_unmap:
-       rpcrdma_sendctx_unmap(req->rl_sendctx);
+       rpcrdma_sendctx_cancel(req->rl_sendctx);
 out_nosc:
        trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
        return ret;
@@ -1364,14 +1363,6 @@ out_badheader:
        goto out;
 }
 
-static void rpcrdma_reply_done(struct kref *kref)
-{
-       struct rpcrdma_req *req =
-               container_of(kref, struct rpcrdma_req, rl_kref);
-
-       rpcrdma_complete_rqst(req->rl_reply);
-}
-
 /**
  * rpcrdma_reply_handler - Process received RPC/RDMA messages
  * @rep: Incoming rpcrdma_rep object to process
@@ -1443,7 +1434,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
                frwr_unmap_async(r_xprt, req);
                /* LocalInv completion will complete the RPC */
        else
-               kref_put(&req->rl_kref, rpcrdma_reply_done);
+               rpcrdma_complete_rqst(rep);
 
 out_post:
        rpcrdma_post_recvs(r_xprt,
index 1a54993f7ffb6173a5d23871ffd762ebba7b6a74..875ad852a11da39522457134adf18db020812bc2 100644 (file)
@@ -279,6 +279,13 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)
        cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
 
        rpcrdma_xprt_disconnect(r_xprt);
+
+       /* The disconnect's sendctx drain can return bc_prealloc reqs
+        * to bc_pa_list after xprt_destroy_backchannel() emptied it.
+        */
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+       xprt_rdma_bc_destroy(xprt, 0);
+#endif
        rpcrdma_buffer_destroy(&r_xprt->rx_buf);
 
        xprt_rdma_free_addresses(xprt);
@@ -488,6 +495,45 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
                           delay);
 }
 
+/* rl_kref has two owners while a Send is outstanding: the rpc_rqst
+ * owner and the sendctx. Replies complete the RPC but do not drop
+ * either reference. The req returns to its free pool only after
+ * xprt_rdma_free_slot() or xprt_rdma_bc_free_rqst() has dropped the
+ * RPC-layer reference and rpcrdma_sendctx_unmap() has dropped the
+ * Send-side reference.
+ */
+static void rpcrdma_req_release(struct kref *kref)
+{
+       struct rpcrdma_req *req =
+               container_of(kref, struct rpcrdma_req, rl_kref);
+       struct rpc_rqst *rqst = &req->rl_slot;
+       struct rpc_xprt *xprt = rqst->rq_xprt;
+       struct rpcrdma_xprt *r_xprt;
+
+       kref_init(&req->rl_kref);
+
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+       if (bc_prealloc(rqst)) {
+               spin_lock(&xprt->bc_pa_lock);
+               list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
+               spin_unlock(&xprt->bc_pa_lock);
+               return;
+       }
+#endif
+
+       if (xprt_wake_up_backlog(xprt, rqst))
+               return;
+
+       r_xprt = rpcx_to_rdmax(xprt);
+       memset(rqst, 0, sizeof(*rqst));
+       rpcrdma_buffer_put(&r_xprt->rx_buf, req);
+}
+
+void rpcrdma_req_put(struct rpcrdma_req *req)
+{
+       kref_put(&req->rl_kref, rpcrdma_req_release);
+}
+
 /**
  * xprt_rdma_alloc_slot - allocate an rpc_rqst
  * @xprt: controlling RPC transport
@@ -506,6 +552,7 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
        req = rpcrdma_buffer_get(&r_xprt->rx_buf);
        if (!req)
                goto out_sleep;
+       kref_init(&req->rl_kref);
        task->tk_rqstp = &req->rl_slot;
        task->tk_status = 0;
        return;
@@ -521,6 +568,7 @@ out_sleep:
        if (req) {
                struct rpc_rqst *rqst = &req->rl_slot;
 
+               kref_init(&req->rl_kref);
                if (!xprt_wake_up_backlog(xprt, rqst)) {
                        memset(rqst, 0, sizeof(*rqst));
                        rpcrdma_buffer_put(&r_xprt->rx_buf, req);
@@ -541,10 +589,7 @@ xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
                container_of(xprt, struct rpcrdma_xprt, rx_xprt);
 
        rpcrdma_reply_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
-       if (!xprt_wake_up_backlog(xprt, rqst)) {
-               memset(rqst, 0, sizeof(*rqst));
-               rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
-       }
+       rpcrdma_req_put(rpcr_to_rdmar(rqst));
 }
 
 static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,
@@ -717,7 +762,7 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
                   r_xprt->rx_stats.mrs_allocated,
                   r_xprt->rx_stats.local_inv_needed,
                   r_xprt->rx_stats.empty_sendctx_q,
-                  r_xprt->rx_stats.reply_waits_for_send);
+                  0LU); /* was reply_waits_for_send; column preserved */
 }
 
 static int
index aecf9c0a153f365991210b46cb3714e347b0c81c..97b8b2376602c897c2e4597c1bc791f2670de1e4 100644 (file)
@@ -65,6 +65,8 @@
 
 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt);
 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt);
+static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
+                                         unsigned long item);
 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
                                       struct rpcrdma_sendctx *sc);
 static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt);
@@ -571,9 +573,9 @@ void rpcrdma_xprt_disconnect(struct rpcrdma_xprt *r_xprt)
 
        rpcrdma_xprt_drain(r_xprt);
        rpcrdma_reps_unmap(r_xprt);
+       rpcrdma_sendctxs_destroy(r_xprt);
        rpcrdma_reqs_reset(r_xprt);
        rpcrdma_mrs_destroy(r_xprt);
-       rpcrdma_sendctxs_destroy(r_xprt);
 
        if (rpcrdma_ep_put(ep))
                rdma_destroy_id(id);
@@ -605,6 +607,20 @@ static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt)
 
        if (!buf->rb_sc_ctxs)
                return;
+
+       /* The QP is drained, but the final unsignaled Sends might not
+        * have been walked by a signaled Send completion. Release those
+        * Send owners before request buffers are reset.
+        */
+       for (i = rpcrdma_sendctx_next(buf, buf->rb_sc_tail);
+            i != rpcrdma_sendctx_next(buf, buf->rb_sc_head);
+            i = rpcrdma_sendctx_next(buf, i)) {
+               struct rpcrdma_sendctx *sc = buf->rb_sc_ctxs[i];
+
+               if (sc && sc->sc_req)
+                       rpcrdma_sendctx_unmap(sc);
+       }
+
        for (i = 0; i <= buf->rb_sc_last; i++)
                kfree(buf->rb_sc_ctxs[i]);
        kfree(buf->rb_sc_ctxs);
@@ -739,15 +755,20 @@ static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        unsigned long next_tail;
 
-       /* Unmap SGEs of previously completed but unsignaled
-        * Sends by walking up the queue until @sc is found.
+       /* Release previously completed but unsignaled Sends by walking
+        * up the queue until @sc is found. Entries left behind by a
+        * failed rpcrdma_prepare_send_sges() have sc_req cleared.
         */
        next_tail = buf->rb_sc_tail;
        do {
+               struct rpcrdma_sendctx *cur;
+
                next_tail = rpcrdma_sendctx_next(buf, next_tail);
 
                /* ORDER: item must be accessed _before_ tail is updated */
-               rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
+               cur = buf->rb_sc_ctxs[next_tail];
+               if (cur->sc_req)
+                       rpcrdma_sendctx_unmap(cur);
 
        } while (buf->rb_sc_ctxs[next_tail] != sc);
 
index f53a7747272451f8280fcfda61d1b1f94ee3876a..f879d9b9f57ef4001051df5a30f052426ab0c5c7 100644 (file)
@@ -427,7 +427,6 @@ struct rpcrdma_stats {
        /* accessed when receiving a reply */
        unsigned long long      total_rdma_reply;
        unsigned long long      fixup_copy_count;
-       unsigned long           reply_waits_for_send;
        unsigned long           local_inv_needed;
        unsigned long           nomsg_call_count;
        unsigned long           bcall_count;
@@ -505,6 +504,7 @@ void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers,
                        struct rpcrdma_req *req);
 void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep);
 void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req);
+void rpcrdma_req_put(struct rpcrdma_req *req);
 
 bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size,
                            gfp_t flags);