]> git.ipfire.org Git - thirdparty/kernel/linux.git/commitdiff
net/rds: Add per cp work queue
authorAllison Henderson <allison.henderson@oracle.com>
Fri, 9 Jan 2026 22:48:42 +0000 (15:48 -0700)
committerPaolo Abeni <pabeni@redhat.com>
Tue, 13 Jan 2026 11:27:03 +0000 (12:27 +0100)
This patch adds a per connection workqueue which can be initialized
and used independently of the globally shared rds_wq.

This patch is the first in a series that aims to address tcp ack
timeouts during the tcp socket shutdown sequence.

This initial refactoring lays the ground work needed to alleviate
queue congestion during heavy reads and writes.  The independently
managed queues will allow shutdowns and reconnects respond more quickly
before the peer(s) timeout waiting for the proper acks.

Signed-off-by: Allison Henderson <allison.henderson@oracle.com>
Link: https://patch.msgid.link/20260109224843.128076-2-achender@kernel.org
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
net/rds/cong.c
net/rds/connection.c
net/rds/ib_recv.c
net/rds/ib_send.c
net/rds/rds.h
net/rds/send.c
net/rds/tcp_recv.c
net/rds/tcp_send.c
net/rds/threads.c

index 8b689ebbd5b52515eed2a5c83c7c524c0ffbc99d..ac1f120c10f9621703177dbbbd23ca246317237f 100644 (file)
@@ -242,7 +242,7 @@ void rds_cong_queue_updates(struct rds_cong_map *map)
                         *    therefore trigger warnings.
                         * Defer the xmit to rds_send_worker() instead.
                         */
-                       queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
+                       queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 0);
                }
                rcu_read_unlock();
        }
index 68bc88cce84ec083cc5367b2990509575120d441..dc7323707f450a4e2f34ecace9caa5fe6f98ca61 100644 (file)
@@ -269,6 +269,7 @@ static struct rds_connection *__rds_conn_create(struct net *net,
                __rds_conn_path_init(conn, &conn->c_path[i],
                                     is_outgoing);
                conn->c_path[i].cp_index = i;
+               conn->c_path[i].cp_wq = rds_wq;
        }
        rcu_read_lock();
        if (rds_destroy_pending(conn))
@@ -884,7 +885,7 @@ void rds_conn_path_drop(struct rds_conn_path *cp, bool destroy)
                rcu_read_unlock();
                return;
        }
-       queue_work(rds_wq, &cp->cp_down_w);
+       queue_work(cp->cp_wq, &cp->cp_down_w);
        rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rds_conn_path_drop);
@@ -909,7 +910,7 @@ void rds_conn_path_connect_if_down(struct rds_conn_path *cp)
        }
        if (rds_conn_path_state(cp) == RDS_CONN_DOWN &&
            !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags))
-               queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
+               queue_delayed_work(cp->cp_wq, &cp->cp_conn_w, 0);
        rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rds_conn_path_connect_if_down);
index 4248dfa816ebf84b94de937c3eb4c3943502b65e..357128d34a546c457de98f7996bb2bb081400607 100644 (file)
@@ -457,7 +457,7 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp)
            (must_wake ||
            (can_wait && rds_ib_ring_low(&ic->i_recv_ring)) ||
            rds_ib_ring_empty(&ic->i_recv_ring))) {
-               queue_delayed_work(rds_wq, &conn->c_recv_w, 1);
+               queue_delayed_work(conn->c_path->cp_wq, &conn->c_recv_w, 1);
        }
        if (can_wait)
                cond_resched();
index 4190b90ff3b18a1848ee45e18d07134101bb3905..f9d28ddd168d8ea5edc10a3fb8982f654bc07763 100644 (file)
@@ -297,7 +297,7 @@ void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc)
 
        if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
            test_bit(0, &conn->c_map_queued))
-               queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+               queue_delayed_work(conn->c_path->cp_wq, &conn->c_send_w, 0);
 
        /* We expect errors as the qp is drained during shutdown */
        if (wc->status != IB_WC_SUCCESS && rds_conn_up(conn)) {
@@ -419,7 +419,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits)
 
        atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits);
        if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags))
-               queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+               queue_delayed_work(conn->c_path->cp_wq, &conn->c_send_w, 0);
 
        WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384);
 
index a029e5fcdea72a3e536f3c1490633390ae89b70f..b35afa2658cc45fc58b16bbc59f8fb6a4f55ea54 100644 (file)
@@ -118,6 +118,7 @@ struct rds_conn_path {
 
        void                    *cp_transport_data;
 
+       struct workqueue_struct *cp_wq;
        atomic_t                cp_state;
        unsigned long           cp_send_gen;
        unsigned long           cp_flags;
index 0b3d0ef2f008b6551d401c7513fd15589a66fa9d..3e3d028bc21ee8b50c9cb72dce2a6fb49bbff5de 100644 (file)
@@ -458,7 +458,8 @@ over_batch:
                        if (rds_destroy_pending(cp->cp_conn))
                                ret = -ENETUNREACH;
                        else
-                               queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+                               queue_delayed_work(cp->cp_wq,
+                                                  &cp->cp_send_w, 1);
                        rcu_read_unlock();
                } else if (raced) {
                        rds_stats_inc(s_send_lock_queue_raced);
@@ -1380,7 +1381,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
                if (rds_destroy_pending(cpath->cp_conn))
                        ret = -ENETUNREACH;
                else
-                       queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
+                       queue_delayed_work(cpath->cp_wq, &cpath->cp_send_w, 1);
                rcu_read_unlock();
        }
        if (ret)
@@ -1470,10 +1471,10 @@ rds_send_probe(struct rds_conn_path *cp, __be16 sport,
        rds_stats_inc(s_send_queued);
        rds_stats_inc(s_send_pong);
 
-       /* schedule the send work on rds_wq */
+       /* schedule the send work on cp_wq */
        rcu_read_lock();
        if (!rds_destroy_pending(cp->cp_conn))
-               queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+               queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 1);
        rcu_read_unlock();
 
        rds_message_put(rm);
index 7997a19d1da30670625ef483c90b70ec13d5a542..b7cf7f451430d27bf57b6da03d7e9f91242a0a32 100644 (file)
@@ -327,7 +327,7 @@ void rds_tcp_data_ready(struct sock *sk)
        if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) {
                rcu_read_lock();
                if (!rds_destroy_pending(cp->cp_conn))
-                       queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+                       queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 0);
                rcu_read_unlock();
        }
 out:
index 7d284ac7e81a5b6bb11d547ef35348bb4f4f387b..4e82c9644aa6a98001a2c13a838fd18850c9c531 100644 (file)
@@ -201,7 +201,7 @@ void rds_tcp_write_space(struct sock *sk)
        rcu_read_lock();
        if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf &&
            !rds_destroy_pending(cp->cp_conn))
-               queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
+               queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 0);
        rcu_read_unlock();
 
 out:
index 1f424cbfcbb47d18245af0c2cce528324649baf2..639302bab51ef2f8b509719b6b39376bfc5d8a70 100644 (file)
@@ -89,8 +89,8 @@ void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
        set_bit(0, &cp->cp_conn->c_map_queued);
        rcu_read_lock();
        if (!rds_destroy_pending(cp->cp_conn)) {
-               queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
-               queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+               queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 0);
+               queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 0);
        }
        rcu_read_unlock();
        cp->cp_conn->c_proposed_version = RDS_PROTOCOL_VERSION;
@@ -140,7 +140,7 @@ void rds_queue_reconnect(struct rds_conn_path *cp)
                cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
                rcu_read_lock();
                if (!rds_destroy_pending(cp->cp_conn))
-                       queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
+                       queue_delayed_work(cp->cp_wq, &cp->cp_conn_w, 0);
                rcu_read_unlock();
                return;
        }
@@ -151,7 +151,7 @@ void rds_queue_reconnect(struct rds_conn_path *cp)
                 conn, &conn->c_laddr, &conn->c_faddr);
        rcu_read_lock();
        if (!rds_destroy_pending(cp->cp_conn))
-               queue_delayed_work(rds_wq, &cp->cp_conn_w,
+               queue_delayed_work(cp->cp_wq, &cp->cp_conn_w,
                                   rand % cp->cp_reconnect_jiffies);
        rcu_read_unlock();
 
@@ -203,11 +203,11 @@ void rds_send_worker(struct work_struct *work)
                switch (ret) {
                case -EAGAIN:
                        rds_stats_inc(s_send_immediate_retry);
-                       queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
+                       queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 0);
                        break;
                case -ENOMEM:
                        rds_stats_inc(s_send_delayed_retry);
-                       queue_delayed_work(rds_wq, &cp->cp_send_w, 2);
+                       queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 2);
                        break;
                default:
                        break;
@@ -228,11 +228,11 @@ void rds_recv_worker(struct work_struct *work)
                switch (ret) {
                case -EAGAIN:
                        rds_stats_inc(s_recv_immediate_retry);
-                       queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+                       queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 0);
                        break;
                case -ENOMEM:
                        rds_stats_inc(s_recv_delayed_retry);
-                       queue_delayed_work(rds_wq, &cp->cp_recv_w, 2);
+                       queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 2);
                        break;
                default:
                        break;