]> git.ipfire.org Git - thirdparty/kernel/linux.git/commitdiff
SUNRPC: only put task on cl_tasks list after the RPC call slot is reserved.
authorDai Ngo <dai.ngo@oracle.com>
Tue, 19 Nov 2024 21:43:22 +0000 (13:43 -0800)
committerAnna Schumaker <anna.schumaker@oracle.com>
Mon, 13 Jan 2025 18:27:25 +0000 (13:27 -0500)
Under heavy write load, we've seen the cl_tasks list grows to
millions of entries. Even though the list is extremely long,
the system still runs fine until the user wants to get the
information of all active RPC tasks by doing:

When this happens, tasks_start acquires the cl_lock to walk the
cl_tasks list, returning one entry at a time to the caller. The
cl_lock is held until all tasks on this list have been processed.

While the cl_lock is held, completed RPC tasks have to spin wait
in rpc_task_release_client for the cl_lock. If there are millions
of entries in the cl_tasks list it will take a long time before
tasks_stop is called and the cl_lock is released.

The spin wait tasks can use up all the available CPUs in the system,
preventing other jobs to run, this causes the system to temporarily
lock up.

This patch fixes this problem by delaying inserting the RPC
task on the cl_tasks list until the RPC call slot is reserved.
This limits the length of the cl_tasks to the number of call
slots available in the system.

Signed-off-by: Dai Ngo <dai.ngo@oracle.com>
Signed-off-by: Anna Schumaker <anna.schumaker@oracle.com>
include/linux/sunrpc/clnt.h
net/sunrpc/clnt.c

index 5321585c778fcc1fef0e0420cb481786c02a7aac..fec976e58174230ae24c638a27e9e28438eb68fd 100644 (file)
@@ -93,6 +93,7 @@ struct rpc_clnt {
        const struct cred       *cl_cred;
        unsigned int            cl_max_connect; /* max number of transports not to the same IP */
        struct super_block *pipefs_sb;
+       atomic_t                cl_task_count;
 };
 
 /*
index 0090162ee8c350568c91f1bcd951675ac3ae141c..cc5014b58e3b6f9eeb3044bb6c68bfa71b931ebf 100644 (file)
@@ -958,12 +958,17 @@ void rpc_shutdown_client(struct rpc_clnt *clnt)
 
        trace_rpc_clnt_shutdown(clnt);
 
+       clnt->cl_shutdown = 1;
        while (!list_empty(&clnt->cl_tasks)) {
                rpc_killall_tasks(clnt);
                wait_event_timeout(destroy_wait,
                        list_empty(&clnt->cl_tasks), 1*HZ);
        }
 
+       /* wait for tasks still in workqueue or waitqueue */
+       wait_event_timeout(destroy_wait,
+                          atomic_read(&clnt->cl_task_count) == 0, 1 * HZ);
+
        rpc_release_client(clnt);
 }
 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
@@ -1139,6 +1144,7 @@ void rpc_task_release_client(struct rpc_task *task)
                list_del(&task->tk_task);
                spin_unlock(&clnt->cl_lock);
                task->tk_client = NULL;
+               atomic_dec(&clnt->cl_task_count);
 
                rpc_release_client(clnt);
        }
@@ -1189,10 +1195,7 @@ void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
                task->tk_flags |= RPC_TASK_TIMEOUT;
        if (clnt->cl_noretranstimeo)
                task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
-       /* Add to the client's list of all tasks */
-       spin_lock(&clnt->cl_lock);
-       list_add_tail(&task->tk_task, &clnt->cl_tasks);
-       spin_unlock(&clnt->cl_lock);
+       atomic_inc(&clnt->cl_task_count);
 }
 
 static void
@@ -1787,9 +1790,14 @@ call_reserveresult(struct rpc_task *task)
        if (status >= 0) {
                if (task->tk_rqstp) {
                        task->tk_action = call_refresh;
+
+                       /* Add to the client's list of all tasks */
+                       spin_lock(&task->tk_client->cl_lock);
+                       if (list_empty(&task->tk_task))
+                               list_add_tail(&task->tk_task, &task->tk_client->cl_tasks);
+                       spin_unlock(&task->tk_client->cl_lock);
                        return;
                }
-
                rpc_call_rpcerror(task, -EIO);
                return;
        }