--- /dev/null
+From e86be3a04bc4aeaf12f93af35f08f8d4385bcd98 Mon Sep 17 00:00:00 2001
+From: Trond Myklebust <trond.myklebust@hammerspace.com>
+Date: Tue, 25 May 2021 18:43:38 -0400
+Subject: SUNRPC: More fixes for backlog congestion
+
+From: Trond Myklebust <trond.myklebust@hammerspace.com>
+
+commit e86be3a04bc4aeaf12f93af35f08f8d4385bcd98 upstream.
+
+Ensure that we fix the XPRT_CONGESTED starvation issue for RDMA as well
+as socket based transports.
+Ensure we always initialise the request after waking up from the backlog
+list.
+
+Fixes: e877a88d1f06 ("SUNRPC in case of backlog, hand free slots directly to waiting task")
+Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
+Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
+---
+ include/linux/sunrpc/xprt.h | 2 +
+ net/sunrpc/xprt.c | 58 +++++++++++++++++++---------------------
+ net/sunrpc/xprtrdma/transport.c | 12 ++++----
+ net/sunrpc/xprtrdma/verbs.c | 18 ++++++++++--
+ net/sunrpc/xprtrdma/xprt_rdma.h | 1
+ 5 files changed, 52 insertions(+), 39 deletions(-)
+
+--- a/include/linux/sunrpc/xprt.h
++++ b/include/linux/sunrpc/xprt.h
+@@ -367,6 +367,8 @@ struct rpc_xprt * xprt_alloc(struct net
+ unsigned int num_prealloc,
+ unsigned int max_req);
+ void xprt_free(struct rpc_xprt *);
++void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task);
++bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req);
+
+ static inline int
+ xprt_enable_swap(struct rpc_xprt *xprt)
+--- a/net/sunrpc/xprt.c
++++ b/net/sunrpc/xprt.c
+@@ -1575,11 +1575,18 @@ xprt_transmit(struct rpc_task *task)
+ spin_unlock(&xprt->queue_lock);
+ }
+
+-static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
++static void xprt_complete_request_init(struct rpc_task *task)
++{
++ if (task->tk_rqstp)
++ xprt_request_init(task);
++}
++
++void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
+ {
+ set_bit(XPRT_CONGESTED, &xprt->state);
+- rpc_sleep_on(&xprt->backlog, task, NULL);
++ rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init);
+ }
++EXPORT_SYMBOL_GPL(xprt_add_backlog);
+
+ static bool __xprt_set_rq(struct rpc_task *task, void *data)
+ {
+@@ -1587,14 +1594,13 @@ static bool __xprt_set_rq(struct rpc_tas
+
+ if (task->tk_rqstp == NULL) {
+ memset(req, 0, sizeof(*req)); /* mark unused */
+- task->tk_status = -EAGAIN;
+ task->tk_rqstp = req;
+ return true;
+ }
+ return false;
+ }
+
+-static bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
++bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
+ {
+ if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) {
+ clear_bit(XPRT_CONGESTED, &xprt->state);
+@@ -1602,6 +1608,7 @@ static bool xprt_wake_up_backlog(struct
+ }
+ return true;
+ }
++EXPORT_SYMBOL_GPL(xprt_wake_up_backlog);
+
+ static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
+ {
+@@ -1611,7 +1618,7 @@ static bool xprt_throttle_congested(stru
+ goto out;
+ spin_lock(&xprt->reserve_lock);
+ if (test_bit(XPRT_CONGESTED, &xprt->state)) {
+- rpc_sleep_on(&xprt->backlog, task, NULL);
++ xprt_add_backlog(xprt, task);
+ ret = true;
+ }
+ spin_unlock(&xprt->reserve_lock);
+@@ -1780,10 +1787,6 @@ xprt_request_init(struct rpc_task *task)
+ struct rpc_xprt *xprt = task->tk_xprt;
+ struct rpc_rqst *req = task->tk_rqstp;
+
+- if (req->rq_task)
+- /* Already initialized */
+- return;
+-
+ req->rq_task = task;
+ req->rq_xprt = xprt;
+ req->rq_buffer = NULL;
+@@ -1844,10 +1847,8 @@ void xprt_retry_reserve(struct rpc_task
+ struct rpc_xprt *xprt = task->tk_xprt;
+
+ task->tk_status = 0;
+- if (task->tk_rqstp != NULL) {
+- xprt_request_init(task);
++ if (task->tk_rqstp != NULL)
+ return;
+- }
+
+ task->tk_status = -EAGAIN;
+ xprt_do_reserve(xprt, task);
+@@ -1872,24 +1873,21 @@ void xprt_release(struct rpc_task *task)
+ }
+
+ xprt = req->rq_xprt;
+- if (xprt) {
+- xprt_request_dequeue_xprt(task);
+- spin_lock(&xprt->transport_lock);
+- xprt->ops->release_xprt(xprt, task);
+- if (xprt->ops->release_request)
+- xprt->ops->release_request(task);
+- xprt_schedule_autodisconnect(xprt);
+- spin_unlock(&xprt->transport_lock);
+- if (req->rq_buffer)
+- xprt->ops->buf_free(task);
+- xdr_free_bvec(&req->rq_rcv_buf);
+- xdr_free_bvec(&req->rq_snd_buf);
+- if (req->rq_cred != NULL)
+- put_rpccred(req->rq_cred);
+- if (req->rq_release_snd_buf)
+- req->rq_release_snd_buf(req);
+- } else
+- xprt = task->tk_xprt;
++ xprt_request_dequeue_xprt(task);
++ spin_lock(&xprt->transport_lock);
++ xprt->ops->release_xprt(xprt, task);
++ if (xprt->ops->release_request)
++ xprt->ops->release_request(task);
++ xprt_schedule_autodisconnect(xprt);
++ spin_unlock(&xprt->transport_lock);
++ if (req->rq_buffer)
++ xprt->ops->buf_free(task);
++ xdr_free_bvec(&req->rq_rcv_buf);
++ xdr_free_bvec(&req->rq_snd_buf);
++ if (req->rq_cred != NULL)
++ put_rpccred(req->rq_cred);
++ if (req->rq_release_snd_buf)
++ req->rq_release_snd_buf(req);
+
+ task->tk_rqstp = NULL;
+ if (likely(!bc_prealloc(req)))
+--- a/net/sunrpc/xprtrdma/transport.c
++++ b/net/sunrpc/xprtrdma/transport.c
+@@ -520,9 +520,8 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xp
+ return;
+
+ out_sleep:
+- set_bit(XPRT_CONGESTED, &xprt->state);
+- rpc_sleep_on(&xprt->backlog, task, NULL);
+ task->tk_status = -EAGAIN;
++ xprt_add_backlog(xprt, task);
+ }
+
+ /**
+@@ -537,10 +536,11 @@ xprt_rdma_free_slot(struct rpc_xprt *xpr
+ struct rpcrdma_xprt *r_xprt =
+ container_of(xprt, struct rpcrdma_xprt, rx_xprt);
+
+- memset(rqst, 0, sizeof(*rqst));
+- rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
+- if (unlikely(!rpc_wake_up_next(&xprt->backlog)))
+- clear_bit(XPRT_CONGESTED, &xprt->state);
++ rpcrdma_reply_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
++ if (!xprt_wake_up_backlog(xprt, rqst)) {
++ memset(rqst, 0, sizeof(*rqst));
++ rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
++ }
+ }
+
+ static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,
+--- a/net/sunrpc/xprtrdma/verbs.c
++++ b/net/sunrpc/xprtrdma/verbs.c
+@@ -1198,6 +1198,20 @@ void rpcrdma_mr_put(struct rpcrdma_mr *m
+ }
+
+ /**
++ * rpcrdma_reply_put - Put reply buffers back into pool
++ * @buffers: buffer pool
++ * @req: object to return
++ *
++ */
++void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
++{
++ if (req->rl_reply) {
++ rpcrdma_rep_put(buffers, req->rl_reply);
++ req->rl_reply = NULL;
++ }
++}
++
++/**
+ * rpcrdma_buffer_get - Get a request buffer
+ * @buffers: Buffer pool from which to obtain a buffer
+ *
+@@ -1225,9 +1239,7 @@ rpcrdma_buffer_get(struct rpcrdma_buffer
+ */
+ void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
+ {
+- if (req->rl_reply)
+- rpcrdma_rep_put(buffers, req->rl_reply);
+- req->rl_reply = NULL;
++ rpcrdma_reply_put(buffers, req);
+
+ spin_lock(&buffers->rb_lock);
+ list_add(&req->rl_list, &buffers->rb_send_bufs);
+--- a/net/sunrpc/xprtrdma/xprt_rdma.h
++++ b/net/sunrpc/xprtrdma/xprt_rdma.h
+@@ -472,6 +472,7 @@ void rpcrdma_mrs_refresh(struct rpcrdma_
+ struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
+ void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers,
+ struct rpcrdma_req *req);
++void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req);
+ void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
+
+ bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size,