]> git.ipfire.org Git - thirdparty/wireguard-go.git/commitdiff
device: use channel close to shut down and drain outbound channel
authorJosh Bleecher Snyder <josh@tailscale.com>
Tue, 15 Dec 2020 23:54:48 +0000 (15:54 -0800)
committerJosh Bleecher Snyder <josh@tailscale.com>
Thu, 17 Dec 2020 00:16:26 +0000 (16:16 -0800)
This is a similar treatment to the handling of the encryption
channel found a few commits ago: Use the closing of the channel
to manage goroutine lifetime and shutdown.
It is considerably simpler because there is only a single writer.

Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
device/peer.go
device/send.go

index 31b75c7b3375871866bd063c7bf8958f17914f92..c0941603600295b8c37243880fc51889ae35f5c9 100644 (file)
@@ -17,7 +17,7 @@ import (
 )
 
 const (
-       PeerRoutineNumber = 3
+       PeerRoutineNumber = 2
 )
 
 type Peer struct {
@@ -287,7 +287,6 @@ func (peer *Peer) Stop() {
 
        peer.queue.Lock()
        close(peer.queue.nonce)
-       close(peer.queue.outbound)
        close(peer.queue.inbound)
        peer.queue.Unlock()
 
index 1b16edd0e4c41f19dc7a1d7110ac1c813508f9e7..1f71f7925015bd46926ec154e48a257ca78d1726 100644 (file)
@@ -372,6 +372,7 @@ func (peer *Peer) RoutineNonce() {
                logDebug.Println(peer, "- Routine: nonce worker - stopped")
                peer.queue.packetInNonceQueueIsAwaitingKey.Set(false)
                device.queue.encryption.wg.Done() // no more writes from us
+               close(peer.queue.outbound)        // no more writes to this channel
                peer.routines.stopping.Done()
        }()
 
@@ -545,64 +546,43 @@ func (peer *Peer) RoutineSequentialSender() {
        logDebug := device.log.Debug
        logError := device.log.Error
 
-       defer func() {
-               for {
-                       select {
-                       case elem, ok := <-peer.queue.outbound:
-                               if ok {
-                                       elem.Lock()
-                                       if !elem.IsDropped() {
-                                               device.PutMessageBuffer(elem.buffer)
-                                               elem.Drop()
-                                       }
-                                       device.PutOutboundElement(elem)
-                               }
-                       default:
-                               goto out
-                       }
-               }
-       out:
-               logDebug.Println(peer, "- Routine: sequential sender - stopped")
-               peer.routines.stopping.Done()
-       }()
-
+       defer logDebug.Println(peer, "- Routine: sequential sender - stopped")
        logDebug.Println(peer, "- Routine: sequential sender - started")
 
-       for {
-               select {
-
-               case <-peer.routines.stop:
-                       return
-
-               case elem, ok := <-peer.queue.outbound:
-
-                       if !ok {
-                               return
-                       }
-
-                       elem.Lock()
-                       if elem.IsDropped() {
-                               device.PutOutboundElement(elem)
-                               continue
-                       }
-
-                       peer.timersAnyAuthenticatedPacketTraversal()
-                       peer.timersAnyAuthenticatedPacketSent()
-
-                       // send message and return buffer to pool
-
-                       err := peer.SendBuffer(elem.packet)
-                       if len(elem.packet) != MessageKeepaliveSize {
-                               peer.timersDataSent()
-                       }
+       for elem := range peer.queue.outbound {
+               elem.Lock()
+               if elem.IsDropped() {
+                       device.PutOutboundElement(elem)
+                       continue
+               }
+               if !peer.isRunning.Get() {
+                       // peer has been stopped; return re-usable elems to the shared pool.
+                       // This is an optimization only. It is possible for the peer to be stopped
+                       // immediately after this check, in which case, elem will get processed.
+                       // The timers and SendBuffer code are resilient to a few stragglers.
+                       // TODO(josharian): rework peer shutdown order to ensure
+                       // that we never accidentally keep timers alive longer than necessary.
                        device.PutMessageBuffer(elem.buffer)
                        device.PutOutboundElement(elem)
-                       if err != nil {
-                               logError.Println(peer, "- Failed to send data packet", err)
-                               continue
-                       }
+                       continue
+               }
+
+               peer.timersAnyAuthenticatedPacketTraversal()
+               peer.timersAnyAuthenticatedPacketSent()
 
-                       peer.keepKeyFreshSending()
+               // send message and return buffer to pool
+
+               err := peer.SendBuffer(elem.packet)
+               if len(elem.packet) != MessageKeepaliveSize {
+                       peer.timersDataSent()
                }
+               device.PutMessageBuffer(elem.buffer)
+               device.PutOutboundElement(elem)
+               if err != nil {
+                       logError.Println(peer, "- Failed to send data packet", err)
+                       continue
+               }
+
+               peer.keepKeyFreshSending()
        }
 }