]> git.ipfire.org Git - thirdparty/linux.git/commitdiff
rxrpc: Fix CPU time starvation in I/O thread
authorDavid Howells <dhowells@redhat.com>
Wed, 4 Dec 2024 07:46:41 +0000 (07:46 +0000)
committerJakub Kicinski <kuba@kernel.org>
Mon, 9 Dec 2024 21:48:26 +0000 (13:48 -0800)
Starvation can happen in the rxrpc I/O thread because it goes back to the
top of the I/O loop after it does any one thing without trying to give any
other connection or call CPU time.  Also, because it processes one call
packet at a time, it tries to do the retransmission loop after each ACK
without checking to see if there are other ACKs already in the queue that
can update the SACK state.

Fix this by:

 (1) Add a received-packet queue on each call.

 (2) Distribute packets from the master Rx queue to the individual call,
     conn and error queues and 'poking' calls to add them to the attend
     queue first thing in the I/O thread.

 (3) Go through all the attention-seeking connections and calls before
     going back to the top of the I/O thread.  Each queue is extracted as a
     whole and then gone through so that new additions to insert themselves
     into the queue.

 (4) Make the call event handler go through all the packets currently on
     the call's rx_queue before transmitting and retransmitting DATA
     packets.

 (5) Drop the skb argument from the call event handler as this is now
     replaced with the rx_queue.  Instead, keep track of whether we
     received a packet or an ACK for the tests that used to rely on that.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
Link: https://patch.msgid.link/20241204074710.990092-14-dhowells@redhat.com
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
include/trace/events/rxrpc.h
net/rxrpc/ar-internal.h
net/rxrpc/call_accept.c
net/rxrpc/call_event.c
net/rxrpc/call_object.c
net/rxrpc/conn_client.c
net/rxrpc/input.c
net/rxrpc/io_thread.c
net/rxrpc/peer_event.c

index 71f07e726a90348dda3b8f302988b0b4ae60730c..28fa7be31ff839aebadaaa940abcc4ee573e999b 100644 (file)
        EM(rxrpc_call_poke_conn_abort,          "Conn-abort")   \
        EM(rxrpc_call_poke_error,               "Error")        \
        EM(rxrpc_call_poke_idle,                "Idle")         \
+       EM(rxrpc_call_poke_rx_packet,           "Rx-packet")    \
        EM(rxrpc_call_poke_set_timeout,         "Set-timo")     \
        EM(rxrpc_call_poke_start,               "Start")        \
        EM(rxrpc_call_poke_timer,               "Timer")        \
 #define rxrpc_skb_traces \
        EM(rxrpc_skb_eaten_by_unshare,          "ETN unshare  ") \
        EM(rxrpc_skb_eaten_by_unshare_nomem,    "ETN unshar-nm") \
+       EM(rxrpc_skb_get_call_rx,               "GET call-rx  ") \
        EM(rxrpc_skb_get_conn_secured,          "GET conn-secd") \
        EM(rxrpc_skb_get_conn_work,             "GET conn-work") \
        EM(rxrpc_skb_get_last_nack,             "GET last-nack") \
        EM(rxrpc_skb_new_error_report,          "NEW error-rpt") \
        EM(rxrpc_skb_new_jumbo_subpacket,       "NEW jumbo-sub") \
        EM(rxrpc_skb_new_unshared,              "NEW unshared ") \
+       EM(rxrpc_skb_put_call_rx,               "PUT call-rx  ") \
        EM(rxrpc_skb_put_conn_secured,          "PUT conn-secd") \
        EM(rxrpc_skb_put_conn_work,             "PUT conn-work") \
        EM(rxrpc_skb_put_error_report,          "PUT error-rep") \
index 4386b2e6cca57fbc5bd6e593ce8968533c1fda1d..55cc68dd1b403e808c5424d37c51b90f89d9b501 100644 (file)
@@ -705,6 +705,7 @@ struct rxrpc_call {
 
        /* Received data tracking */
        struct sk_buff_head     recvmsg_queue;  /* Queue of packets ready for recvmsg() */
+       struct sk_buff_head     rx_queue;       /* Queue of packets for this call to receive */
        struct sk_buff_head     rx_oos_queue;   /* Queue of out of sequence packets */
 
        rxrpc_seq_t             rx_highest_seq; /* Higest sequence number received */
@@ -906,7 +907,7 @@ void rxrpc_propose_delay_ACK(struct rxrpc_call *, rxrpc_serial_t,
 void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *);
 void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb);
 
-bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb);
+bool rxrpc_input_call_event(struct rxrpc_call *call);
 
 /*
  * call_object.c
@@ -1352,6 +1353,13 @@ static inline bool after_eq(u32 seq1, u32 seq2)
         return (s32)(seq1 - seq2) >= 0;
 }
 
+static inline void rxrpc_queue_rx_call_packet(struct rxrpc_call *call, struct sk_buff *skb)
+{
+       rxrpc_get_skb(skb, rxrpc_skb_get_call_rx);
+       __skb_queue_tail(&call->rx_queue, skb);
+       rxrpc_poke_call(call, rxrpc_call_poke_rx_packet);
+}
+
 /*
  * debug tracing
  */
index 0f5a1d77b890f83d61edf3b47bf8d5d7a420135e..a6776b1604bab68812b46be3c15a81f0c032adc9 100644 (file)
@@ -408,7 +408,7 @@ bool rxrpc_new_incoming_call(struct rxrpc_local *local,
        }
 
        _leave(" = %p{%d}", call, call->debug_id);
-       rxrpc_input_call_event(call, skb);
+       rxrpc_queue_rx_call_packet(call, skb);
        rxrpc_put_call(call, rxrpc_call_put_input);
        return true;
 
index 1f716f09d441537ef69d0b77b2be8387a83c0fab..ef47de3f41c6d8ce6bc6f2b8d2483ba6a4287b44 100644 (file)
@@ -324,10 +324,11 @@ static void rxrpc_send_initial_ping(struct rxrpc_call *call)
 /*
  * Handle retransmission and deferred ACK/abort generation.
  */
-bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
+bool rxrpc_input_call_event(struct rxrpc_call *call)
 {
+       struct sk_buff *skb;
        ktime_t now, t;
-       bool resend = false;
+       bool resend = false, did_receive = false, saw_ack = false;
        s32 abort_code;
 
        rxrpc_see_call(call, rxrpc_call_see_input);
@@ -337,9 +338,6 @@ bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
               call->debug_id, rxrpc_call_states[__rxrpc_call_state(call)],
               call->events);
 
-       if (__rxrpc_call_is_complete(call))
-               goto out;
-
        /* Handle abort request locklessly, vs rxrpc_propose_abort(). */
        abort_code = smp_load_acquire(&call->send_abort);
        if (abort_code) {
@@ -348,11 +346,21 @@ bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
                goto out;
        }
 
-       if (skb && skb->mark == RXRPC_SKB_MARK_ERROR)
-               goto out;
+       while ((skb = __skb_dequeue(&call->rx_queue))) {
+               struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+
+               if (__rxrpc_call_is_complete(call) ||
+                   skb->mark == RXRPC_SKB_MARK_ERROR) {
+                       rxrpc_free_skb(skb, rxrpc_skb_put_call_rx);
+                       goto out;
+               }
+
+               saw_ack |= sp->hdr.type == RXRPC_PACKET_TYPE_ACK;
 
-       if (skb)
                rxrpc_input_call_packet(call, skb);
+               rxrpc_free_skb(skb, rxrpc_skb_put_call_rx);
+               did_receive = true;
+       }
 
        /* If we see our async-event poke, check for timeout trippage. */
        now = ktime_get_real();
@@ -418,12 +426,8 @@ bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
                               rxrpc_propose_ack_ping_for_keepalive);
        }
 
-       if (skb) {
-               struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
-
-               if (sp->hdr.type == RXRPC_PACKET_TYPE_ACK)
-                       rxrpc_congestion_degrade(call);
-       }
+       if (saw_ack)
+               rxrpc_congestion_degrade(call);
 
        if (test_and_clear_bit(RXRPC_CALL_EV_INITIAL_PING, &call->events))
                rxrpc_send_initial_ping(call);
@@ -494,7 +498,7 @@ out:
                if (call->security)
                        call->security->free_call_crypto(call);
        } else {
-               if (skb &&
+               if (did_receive &&
                    call->peer->ackr_adv_pmtud &&
                    call->peer->pmtud_pending)
                        rxrpc_send_probe_for_pmtud(call);
index 0df647d1d3a252c1fc3ed2f697945b9ccf4d384b..c026f16f891ebd63d215a01590f2ef1a6b82d208 100644 (file)
@@ -148,6 +148,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
        INIT_LIST_HEAD(&call->attend_link);
        INIT_LIST_HEAD(&call->tx_sendmsg);
        INIT_LIST_HEAD(&call->tx_buffer);
+       skb_queue_head_init(&call->rx_queue);
        skb_queue_head_init(&call->recvmsg_queue);
        skb_queue_head_init(&call->rx_oos_queue);
        init_waitqueue_head(&call->waitq);
@@ -536,6 +537,7 @@ void rxrpc_get_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
 static void rxrpc_cleanup_ring(struct rxrpc_call *call)
 {
        rxrpc_purge_queue(&call->recvmsg_queue);
+       rxrpc_purge_queue(&call->rx_queue);
        rxrpc_purge_queue(&call->rx_oos_queue);
 }
 
index 86fb18bcd1885281c733027d2c5624b7196ec642..706631e6ac2f1074803d4427402670668053290e 100644 (file)
@@ -508,16 +508,18 @@ static void rxrpc_activate_channels(struct rxrpc_bundle *bundle)
 void rxrpc_connect_client_calls(struct rxrpc_local *local)
 {
        struct rxrpc_call *call;
+       LIST_HEAD(new_client_calls);
 
-       while ((call = list_first_entry_or_null(&local->new_client_calls,
-                                               struct rxrpc_call, wait_link))
-              ) {
+       spin_lock(&local->client_call_lock);
+       list_splice_tail_init(&local->new_client_calls, &new_client_calls);
+       spin_unlock(&local->client_call_lock);
+
+       while ((call = list_first_entry_or_null(&new_client_calls,
+                                               struct rxrpc_call, wait_link))) {
                struct rxrpc_bundle *bundle = call->bundle;
 
-               spin_lock(&local->client_call_lock);
                list_move_tail(&call->wait_link, &bundle->waiting_calls);
                rxrpc_see_call(call, rxrpc_call_see_waiting_call);
-               spin_unlock(&local->client_call_lock);
 
                if (rxrpc_bundle_has_space(bundle))
                        rxrpc_activate_channels(bundle);
index 8398fa10ee8d7e8c7989e4a1a18928f59e414445..96fe005c5e819600c222f4af2e654d317c47c424 100644 (file)
@@ -1124,5 +1124,5 @@ void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb)
                break;
        }
 
-       rxrpc_input_call_event(call, skb);
+       rxrpc_input_call_event(call);
 }
index bd6d4f5e97b4c08fd5e7ba9555de603f4ba4686d..bc678a299bd87553b5e7bff648ad60d2ed82eb53 100644 (file)
@@ -338,7 +338,6 @@ static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
        struct rxrpc_channel *chan;
        struct rxrpc_call *call = NULL;
        unsigned int channel;
-       bool ret;
 
        if (sp->hdr.securityIndex != conn->security_ix)
                return rxrpc_direct_abort(skb, rxrpc_eproto_wrong_security,
@@ -425,9 +424,9 @@ static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
                                               peer_srx, skb);
        }
 
-       ret = rxrpc_input_call_event(call, skb);
+       rxrpc_queue_rx_call_packet(call, skb);
        rxrpc_put_call(call, rxrpc_call_put_input);
-       return ret;
+       return true;
 }
 
 /*
@@ -444,6 +443,8 @@ int rxrpc_io_thread(void *data)
        ktime_t now;
 #endif
        bool should_stop;
+       LIST_HEAD(conn_attend_q);
+       LIST_HEAD(call_attend_q);
 
        complete(&local->io_thread_ready);
 
@@ -454,43 +455,25 @@ int rxrpc_io_thread(void *data)
        for (;;) {
                rxrpc_inc_stat(local->rxnet, stat_io_loop);
 
-               /* Deal with connections that want immediate attention. */
-               conn = list_first_entry_or_null(&local->conn_attend_q,
-                                               struct rxrpc_connection,
-                                               attend_link);
-               if (conn) {
-                       spin_lock_bh(&local->lock);
-                       list_del_init(&conn->attend_link);
-                       spin_unlock_bh(&local->lock);
-
-                       rxrpc_input_conn_event(conn, NULL);
-                       rxrpc_put_connection(conn, rxrpc_conn_put_poke);
-                       continue;
+               /* Inject a delay into packets if requested. */
+#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
+               now = ktime_get_real();
+               while ((skb = skb_peek(&local->rx_delay_queue))) {
+                       if (ktime_before(now, skb->tstamp))
+                               break;
+                       skb = skb_dequeue(&local->rx_delay_queue);
+                       skb_queue_tail(&local->rx_queue, skb);
                }
+#endif
 
-               if (test_and_clear_bit(RXRPC_CLIENT_CONN_REAP_TIMER,
-                                      &local->client_conn_flags))
-                       rxrpc_discard_expired_client_conns(local);
-
-               /* Deal with calls that want immediate attention. */
-               if ((call = list_first_entry_or_null(&local->call_attend_q,
-                                                    struct rxrpc_call,
-                                                    attend_link))) {
-                       spin_lock_bh(&local->lock);
-                       list_del_init(&call->attend_link);
-                       spin_unlock_bh(&local->lock);
-
-                       trace_rxrpc_call_poked(call);
-                       rxrpc_input_call_event(call, NULL);
-                       rxrpc_put_call(call, rxrpc_call_put_poke);
-                       continue;
+               if (!skb_queue_empty(&local->rx_queue)) {
+                       spin_lock_irq(&local->rx_queue.lock);
+                       skb_queue_splice_tail_init(&local->rx_queue, &rx_queue);
+                       spin_unlock_irq(&local->rx_queue.lock);
                }
 
-               if (!list_empty(&local->new_client_calls))
-                       rxrpc_connect_client_calls(local);
-
-               /* Process received packets and errors. */
-               if ((skb = __skb_dequeue(&rx_queue))) {
+               /* Distribute packets and errors. */
+               while ((skb = __skb_dequeue(&rx_queue))) {
                        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
                        switch (skb->mark) {
                        case RXRPC_SKB_MARK_PACKET:
@@ -514,27 +497,46 @@ int rxrpc_io_thread(void *data)
                                rxrpc_free_skb(skb, rxrpc_skb_put_unknown);
                                break;
                        }
-                       continue;
                }
 
-               /* Inject a delay into packets if requested. */
-#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
-               now = ktime_get_real();
-               while ((skb = skb_peek(&local->rx_delay_queue))) {
-                       if (ktime_before(now, skb->tstamp))
-                               break;
-                       skb = skb_dequeue(&local->rx_delay_queue);
-                       skb_queue_tail(&local->rx_queue, skb);
+               /* Deal with connections that want immediate attention. */
+               spin_lock_bh(&local->lock);
+               list_splice_tail_init(&local->conn_attend_q, &conn_attend_q);
+               spin_unlock_bh(&local->lock);
+
+               while ((conn = list_first_entry_or_null(&conn_attend_q,
+                                                       struct rxrpc_connection,
+                                                       attend_link))) {
+                       spin_lock_bh(&local->lock);
+                       list_del_init(&conn->attend_link);
+                       spin_unlock_bh(&local->lock);
+                       rxrpc_input_conn_event(conn, NULL);
+                       rxrpc_put_connection(conn, rxrpc_conn_put_poke);
                }
-#endif
 
-               if (!skb_queue_empty(&local->rx_queue)) {
-                       spin_lock_irq(&local->rx_queue.lock);
-                       skb_queue_splice_tail_init(&local->rx_queue, &rx_queue);
-                       spin_unlock_irq(&local->rx_queue.lock);
-                       continue;
+               if (test_and_clear_bit(RXRPC_CLIENT_CONN_REAP_TIMER,
+                                      &local->client_conn_flags))
+                       rxrpc_discard_expired_client_conns(local);
+
+               /* Deal with calls that want immediate attention. */
+               spin_lock_bh(&local->lock);
+               list_splice_tail_init(&local->call_attend_q, &call_attend_q);
+               spin_unlock_bh(&local->lock);
+
+               while ((call = list_first_entry_or_null(&call_attend_q,
+                                                       struct rxrpc_call,
+                                                       attend_link))) {
+                       spin_lock_bh(&local->lock);
+                       list_del_init(&call->attend_link);
+                       spin_unlock_bh(&local->lock);
+                       trace_rxrpc_call_poked(call);
+                       rxrpc_input_call_event(call);
+                       rxrpc_put_call(call, rxrpc_call_put_poke);
                }
 
+               if (!list_empty(&local->new_client_calls))
+                       rxrpc_connect_client_calls(local);
+
                set_current_state(TASK_INTERRUPTIBLE);
                should_stop = kthread_should_stop();
                if (!skb_queue_empty(&local->rx_queue) ||
index 8fc9464a960caa87598c05b6c06ce4fed34cda66..ff30e0c055079d9fb8b58d1f1bc5e84796d8d657 100644 (file)
@@ -224,7 +224,7 @@ static void rxrpc_distribute_error(struct rxrpc_peer *peer, struct sk_buff *skb,
 
                rxrpc_see_call(call, rxrpc_call_see_distribute_error);
                rxrpc_set_call_completion(call, compl, 0, -err);
-               rxrpc_input_call_event(call, skb);
+               rxrpc_input_call_event(call);
 
                spin_lock(&peer->lock);
        }