unsigned int max_req);
void xprt_free(struct rpc_xprt *);
void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task);
+void xprt_add_backlog_noncongested(struct rpc_xprt *xprt,
+ struct rpc_task *task);
bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req);
void xprt_cleanup_ids(void);
}
EXPORT_SYMBOL_GPL(xprt_add_backlog);
+/**
+ * xprt_add_backlog_noncongested - queue task on backlog
+ * @xprt: transport whose backlog queue receives the task
+ * @task: task to queue
+ *
+ * Like xprt_add_backlog, but does not set XPRT_CONGESTED.
+ * For transports whose free_slot path does not synchronize
+ * with xprt_throttle_congested via reserve_lock.
+ */
+void xprt_add_backlog_noncongested(struct rpc_xprt *xprt,
+ struct rpc_task *task)
+{
+ rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init);
+}
+EXPORT_SYMBOL_GPL(xprt_add_backlog_noncongested);
+
static bool __xprt_set_rq(struct rpc_task *task, void *data)
{
struct rpc_rqst *req = data;
out_sleep:
task->tk_status = -EAGAIN;
- xprt_add_backlog(xprt, task);
+ xprt_add_backlog_noncongested(xprt, task);
+ /* A buffer freed between buffer_get and rpc_sleep_on
+ * goes back to the pool with no waiter to wake.
+ * Re-check after joining the backlog to close that gap.
+ */
+ req = rpcrdma_buffer_get(&r_xprt->rx_buf);
+ if (req) {
+ struct rpc_rqst *rqst = &req->rl_slot;
+
+ if (!xprt_wake_up_backlog(xprt, rqst)) {
+ memset(rqst, 0, sizeof(*rqst));
+ rpcrdma_buffer_put(&r_xprt->rx_buf, req);
+ }
+ }
}
/**